Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Queues Support #354

Merged
merged 13 commits into from Sep 7, 2022
25 changes: 25 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 27 additions & 4 deletions packages/cache/test/plugin.spec.ts
Expand Up @@ -7,11 +7,13 @@ import {
CachedMeta,
NoOpCache,
} from "@miniflare/cache";
import { QueueBroker } from "@miniflare/queues";
import {
Compatibility,
LogLevel,
NoOpLog,
PluginContext,
QueueEventDispatcher,
StoredValueMeta,
} from "@miniflare/shared";
import {
Expand All @@ -31,8 +33,16 @@ import { testResponse } from "./helpers";
const log = new NoOpLog();
const compat = new Compatibility();
const rootPath = process.cwd();
const ctx: PluginContext = { log, compat, rootPath, globalAsyncIO: true };

const queueBroker = new QueueBroker();
const queueEventDispatcher: QueueEventDispatcher = async (_batch) => {};
const ctx: PluginContext = {
log,
compat,
rootPath,
queueBroker,
queueEventDispatcher,
globalAsyncIO: true,
};
test("CacheStorage: provides default cache", async (t) => {
const factory = new MemoryStorageFactory();
const caches = new CacheStorage({}, log, factory, {});
Expand Down Expand Up @@ -208,14 +218,27 @@ test("CachePlugin: setup: Responses parse files in FormData as File objects only
});
test("CachePlugin: setup: operations throw outside request handler unless globalAsyncIO set", async (t) => {
const factory = new MemoryStorageFactory();
let plugin = new CachePlugin({ log, compat, rootPath });
let plugin = new CachePlugin({
log,
compat,
rootPath,
queueBroker,
queueEventDispatcher,
});
let caches: CacheStorage = plugin.setup(factory).globals?.caches;
await t.throwsAsync(caches.default.match("http://localhost"), {
instanceOf: Error,
message: /^Some functionality, such as asynchronous I\/O/,
});

plugin = new CachePlugin({ log, compat, rootPath, globalAsyncIO: true });
plugin = new CachePlugin({
log,
compat,
rootPath,
globalAsyncIO: true,
queueBroker,
queueEventDispatcher,
});
caches = plugin.setup(factory).globals?.caches;
await caches.default.match("http://localhost");
});
41 changes: 41 additions & 0 deletions packages/core/src/index.ts
@@ -1,12 +1,14 @@
import fs from "fs/promises";
import path from "path";
import { URL } from "url";
import { QueueBroker } from "@miniflare/queues";
import {
AdditionalModules,
BeforeSetupResult,
Compatibility,
Context,
Log,
MessageBatch,
Mutex,
Options,
PluginContext,
Expand Down Expand Up @@ -49,8 +51,10 @@ import {
ServiceWorkerGlobalScope,
_kLoopHeader,
kAddModuleFetchListener,
kAddModuleQueueListener,
kAddModuleScheduledListener,
kDispatchFetch,
kDispatchQueue,
kDispatchScheduled,
kDispose,
withImmutableHeaders,
Expand Down Expand Up @@ -216,6 +220,7 @@ function throwNoScriptError(modules?: boolean) {
export interface MiniflareCoreContext {
log: Log;
storageFactory: StorageFactory;
queueBroker: QueueBroker;
scriptRunner?: ScriptRunner;
scriptRequired?: boolean;
scriptRunForModuleExports?: boolean;
Expand Down Expand Up @@ -400,13 +405,26 @@ export class MiniflareCore<
} else {
this.#compat = new Compatibility(compatibilityDate, compatibilityFlags);
}

const queueBroker = this.#ctx.queueBroker;
const queueEventDispatcher = async (batch: MessageBatch) => {
await this.dispatchQueue(batch);

// TODO(soon) detect success vs failure during processing
this.#ctx.log.info(
`${batch.queue} (${batch.messages.length} Messages) OK`
);
};

const ctx: PluginContext = {
log: this.#ctx.log,
compat: this.#compat,
rootPath,
usageModel,
globalAsyncIO,
fetchMock,
queueEventDispatcher,
queueBroker,
};

// Log options and compatibility flags every time they might've changed
Expand Down Expand Up @@ -782,6 +800,11 @@ export class MiniflareCore<
if (scheduledListener) {
globalScope[kAddModuleScheduledListener](scheduledListener);
}

const queueListener = defaults?.queue?.bind(defaults);
if (queueListener) {
globalScope[kAddModuleQueueListener](queueListener);
}
}
}

Expand Down Expand Up @@ -1118,6 +1141,24 @@ export class MiniflareCore<
);
}

async dispatchQueue<WaitUntil extends any[] = unknown[]>(
batch: MessageBatch
): Promise<WaitUntil> {
await this.#initPromise;

const { usageModel } = this.#instances!.CorePlugin;
const globalScope = this.#globalScope;

// Each fetch gets its own context (e.g. 50 subrequests).
// Start a new pipeline too.
return new RequestContext({
externalSubrequestLimit: usageModelExternalSubrequestLimit(usageModel),
}).runWith(() => {
const result = globalScope![kDispatchQueue]<WaitUntil>(batch);
return result;
});
}

async dispose(): Promise<void> {
// Ensure initialisation complete before disposing
// (see https://github.com/cloudflare/miniflare/issues/341)
Expand Down
7 changes: 1 addition & 6 deletions packages/core/src/plugins/core.ts
Expand Up @@ -22,7 +22,6 @@ import {
import webStreams from "stream/web";
import { URL, URLSearchParams } from "url";
import { TextDecoder, TextEncoder } from "util";
import { deserialize, serialize } from "v8";
import {
CompatibilityFlag,
Context,
Expand All @@ -37,6 +36,7 @@ import {
STRING_SCRIPT_PATH,
SetupResult,
globsToMatcher,
structuredCloneBuffer,
} from "@miniflare/shared";
import { File, FormData, Headers, MockAgent } from "undici";
// @ts-expect-error `urlpattern-polyfill` only provides global types
Expand Down Expand Up @@ -99,11 +99,6 @@ function proxyDisableStreamConstructor<
});
}

// Approximation of structuredClone for Node < 17.0.0
function structuredCloneBuffer<T>(value: T): T {
return deserialize(serialize(value));
}

export interface CoreOptions {
script?: string;
scriptPath?: string;
Expand Down
46 changes: 44 additions & 2 deletions packages/core/src/standards/event.ts
Expand Up @@ -2,6 +2,7 @@ import {
Awaitable,
Context,
Log,
MessageBatch,
MiniflareError,
ThrowingEventTarget,
TypedEventListener,
Expand Down Expand Up @@ -106,10 +107,27 @@ export class ScheduledEvent extends Event {
}
}

export class QueueEvent extends Event {
readonly batch: MessageBatch;
readonly [kWaitUntil]: Promise<unknown>[] = [];

constructor(type: "queue", init: { batch: MessageBatch }) {
super(type);
this.batch = init.batch;
}

waitUntil(promise: Promise<any>): void {
if (!(this instanceof QueueEvent)) {
throw new TypeError("Illegal invocation");
}
this[kWaitUntil].push(promise);
}
}

export class ExecutionContext {
readonly #event: FetchEvent | ScheduledEvent;
readonly #event: FetchEvent | ScheduledEvent | QueueEvent;

constructor(event: FetchEvent | ScheduledEvent) {
constructor(event: FetchEvent | ScheduledEvent | QueueEvent) {
this.#event = event;
}

Expand Down Expand Up @@ -147,12 +165,20 @@ export type ModuleScheduledListener = (
ctx: ExecutionContext
) => any;

export type ModuleQueueListener = (
batch: MessageBatch,
env: Context,
ctx: ExecutionContext
) => any;

export const kAddModuleFetchListener = Symbol("kAddModuleFetchListener");
export const kAddModuleScheduledListener = Symbol(
"kAddModuleScheduledListener"
);
export const kAddModuleQueueListener = Symbol("kAddModuleQueueListener");
export const kDispatchFetch = Symbol("kDispatchFetch");
export const kDispatchScheduled = Symbol("kDispatchScheduled");
export const kDispatchQueue = Symbol("kDispatchQueue");
export const kDispose = Symbol("kDispose");

export class PromiseRejectionEvent extends Event {
Expand All @@ -172,6 +198,7 @@ export class PromiseRejectionEvent extends Event {
export type WorkerGlobalScopeEventMap = {
fetch: FetchEvent;
scheduled: ScheduledEvent;
queue: QueueEvent;
unhandledrejection: PromiseRejectionEvent;
rejectionhandled: PromiseRejectionEvent;
};
Expand Down Expand Up @@ -334,6 +361,13 @@ export class ServiceWorkerGlobalScope extends WorkerGlobalScope {
});
}

[kAddModuleQueueListener](listener: ModuleQueueListener): void {
super.addEventListener("queue", (e) => {
const res = listener(e.batch, this.#bindings, new ExecutionContext(e));
if (res !== undefined) e.waitUntil(Promise.resolve(res));
});
}

async [kDispatchFetch]<WaitUntil extends any[] = unknown[]>(
request: Request,
proxy = false
Expand Down Expand Up @@ -418,6 +452,14 @@ export class ServiceWorkerGlobalScope extends WorkerGlobalScope {
return (await Promise.all(event[kWaitUntil])) as WaitUntil;
}

async [kDispatchQueue]<WaitUntil extends any[] = any[]>(
batch: MessageBatch
): Promise<WaitUntil> {
const event = new QueueEvent("queue", { batch });
super.dispatchEvent(event);
return (await Promise.all(event[kWaitUntil])) as WaitUntil;
}

// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
#maybeAddPromiseListener(listener: PromiseListener, member: any): void {
if (listener.set.size === 0) {
Expand Down
5 changes: 5 additions & 0 deletions packages/core/test/index.mounts.spec.ts
Expand Up @@ -16,6 +16,7 @@ import {
import { DurableObjectsPlugin } from "@miniflare/durable-objects";
import { HTTPPlugin, createServer } from "@miniflare/http-server";
import { KVPlugin } from "@miniflare/kv";
import { QueueBroker } from "@miniflare/queues";
import { VMScriptRunner } from "@miniflare/runner-vm";
import { LogLevel, NoOpLog, StoredValueMeta } from "@miniflare/shared";
import {
Expand Down Expand Up @@ -248,6 +249,7 @@ test("MiniflareCore: #init: doesn't throw if script required, parent script not
storageFactory: new MemoryStorageFactory(),
scriptRunner: new VMScriptRunner(),
scriptRequired: true,
queueBroker: new QueueBroker(),
};

const mf = new MiniflareCore({ CorePlugin }, ctx, {
Expand All @@ -262,6 +264,7 @@ test("MiniflareCore: #init: logs reload errors when mount options update instead
log,
storageFactory: new MemoryStorageFactory(),
scriptRunner: new VMScriptRunner(),
queueBroker: new QueueBroker(),
};
const mf = new MiniflareCore({ CorePlugin, DurableObjectsPlugin }, ctx, {
mounts: { a: { script: "//" } },
Expand Down Expand Up @@ -780,6 +783,7 @@ test("MiniflareCore: runs mounted worker script for Durable Object classes used
storageFactory: new MemoryStorageFactory(),
scriptRunner: new VMScriptRunner(),
scriptRunForModuleExports: true,
queueBroker: new QueueBroker(),
},
{
modules: true,
Expand Down Expand Up @@ -823,6 +827,7 @@ test("MiniflareCore: can access Durable Objects defined in parent or other mount
log: new NoOpLog(),
storageFactory: new MemoryStorageFactory(),
scriptRunner: new VMScriptRunner(),
queueBroker: new QueueBroker(),
},
{
name: "parent",
Expand Down