From 1af94df6b74aeb4f6ebcbe80e074b4cb252e62e3 Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 23 May 2024 11:53:30 +1200 Subject: [PATCH] improve worker pool error reporting and finalization (#2819) --- .changeset/dry-waves-begin.md | 5 ++++ .changeset/eighty-mugs-sneeze.md | 9 ++++++ .changeset/stale-beers-promise.md | 5 ++++ packages/effect/src/internal/pool.ts | 15 ++++++---- packages/effect/test/Pool.test.ts | 21 +++++++++++++ .../platform-browser/src/internal/worker.ts | 5 +++- packages/platform/src/Worker.ts | 11 ++++--- packages/platform/src/WorkerError.ts | 21 ++----------- packages/platform/src/internal/worker.ts | 30 +++++++++---------- 9 files changed, 76 insertions(+), 46 deletions(-) create mode 100644 .changeset/dry-waves-begin.md create mode 100644 .changeset/eighty-mugs-sneeze.md create mode 100644 .changeset/stale-beers-promise.md diff --git a/.changeset/dry-waves-begin.md b/.changeset/dry-waves-begin.md new file mode 100644 index 0000000000..ff832d1e72 --- /dev/null +++ b/.changeset/dry-waves-begin.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +ensure pool calls finalizer for failed acquisitions diff --git a/.changeset/eighty-mugs-sneeze.md b/.changeset/eighty-mugs-sneeze.md new file mode 100644 index 0000000000..c4052c9503 --- /dev/null +++ b/.changeset/eighty-mugs-sneeze.md @@ -0,0 +1,9 @@ +--- +"@effect/platform-browser": minor +"@effect/platform": minor +"@effect/platform-bun": minor +"@effect/platform-node": minor +"@effect/platform-node-shared": minor +--- + +remove `permits` from workers, to prevent issues with pool resizing diff --git a/.changeset/stale-beers-promise.md b/.changeset/stale-beers-promise.md new file mode 100644 index 0000000000..ea15d89b96 --- /dev/null +++ b/.changeset/stale-beers-promise.md @@ -0,0 +1,5 @@ +--- +"@effect/platform": patch +--- + +ensure worker pool construction errors are reported during creation diff --git a/packages/effect/src/internal/pool.ts b/packages/effect/src/internal/pool.ts index 8d1fb416a3..ff9d00fab4 100644 --- a/packages/effect/src/internal/pool.ts +++ b/packages/effect/src/internal/pool.ts @@ -218,12 +218,15 @@ class PoolImpl implements Pool.Pool { const release = (attempted: Attempted): Effect.Effect => core.exitMatch(attempted.result, { onFailure: () => - core.flatten(ref.modify(this.state, (state) => { - if (state.size <= this.min) { - return [allocateUinterruptible(this), { ...state, free: state.free + 1 }] as const - } - return [core.void, { ...state, size: state.size - 1 }] as const - })), + core.zipRight( + attempted.finalizer, + core.flatten(ref.modify(this.state, (state) => { + if (state.size <= this.min) { + return [allocateUinterruptible(this), { ...state, free: state.free + 1 }] as const + } + return [core.void, { ...state, size: state.size - 1 }] as const + })) + ), onSuccess: (item) => core.flatMap(ref.get(this.invalidated), (set) => { if (pipe(set, HashSet.has(item))) { diff --git a/packages/effect/test/Pool.test.ts b/packages/effect/test/Pool.test.ts index 09c472d57b..b557d8d50c 100644 --- a/packages/effect/test/Pool.test.ts +++ b/packages/effect/test/Pool.test.ts @@ -246,4 +246,25 @@ describe("Pool", () => { const result = yield* $(Fiber.interrupt(fiber)) expect(result).toEqual(Exit.interrupt(fiberId)) })) + + it.scoped("finalizer is called for failed allocations", () => + Effect.gen(function*() { + const scope = yield* Scope.make() + const count = yield* Ref.make(0) + const get = Effect.acquireRelease( + Ref.updateAndGet(count, (n) => n + 1), + () => Ref.update(count, (n) => n - 1) + ).pipe( + Effect.andThen(Effect.fail("boom")) + ) + const pool = yield* Pool.make({ acquire: get, size: 10 }).pipe( + Scope.extend(scope) + ) + yield* Effect.scoped(pool.get).pipe( + Effect.ignore + ) + expect(yield* Ref.get(count)).toBe(10) + yield* Scope.close(scope, Exit.void) + expect(yield* Ref.get(count)).toBe(0) + })) }) diff --git a/packages/platform-browser/src/internal/worker.ts b/packages/platform-browser/src/internal/worker.ts index 427f83b03c..87adb12bb9 100644 --- a/packages/platform-browser/src/internal/worker.ts +++ b/packages/platform-browser/src/internal/worker.ts @@ -1,5 +1,6 @@ import * as Worker from "@effect/platform/Worker" import { WorkerError } from "@effect/platform/WorkerError" +import * as Deferred from "effect/Deferred" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" import * as Queue from "effect/Queue" @@ -19,6 +20,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({ yield* _(Effect.addFinalizer(() => Effect.sync(() => port.postMessage([1])))) const queue = yield* _(Queue.unbounded>()) + const latch = yield* Deferred.make() const fiber = yield* _( Effect.async((resume) => { @@ -30,6 +32,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({ } port.addEventListener("message", onMessage as any) port.addEventListener("error", onError as any) + Deferred.unsafeDone(latch, Effect.void) return Effect.sync(() => { port.removeEventListener("message", onMessage as any) port.removeEventListener("error", onError as any) @@ -38,7 +41,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({ Effect.interruptible, Effect.forkScoped ) - yield* _(Effect.yieldNow()) + yield* Deferred.await(latch) if ("start" in port) { port.start() diff --git a/packages/platform/src/Worker.ts b/packages/platform/src/Worker.ts index 4e0ff5d44c..997eb148e0 100644 --- a/packages/platform/src/Worker.ts +++ b/packages/platform/src/Worker.ts @@ -106,7 +106,6 @@ export declare namespace Worker { export interface Options { readonly encode?: (message: I) => Effect.Effect readonly transfers?: (message: I) => ReadonlyArray - readonly permits?: number readonly queue?: WorkerQueue readonly initialMessage?: LazyArg } @@ -227,7 +226,7 @@ export const layerManager: Layer.Layer = i */ export const makePool: ( options: WorkerPool.Options -) => Effect.Effect, never, WorkerManager | Spawner | Scope.Scope> = internal.makePool +) => Effect.Effect, WorkerError, WorkerManager | Spawner | Scope.Scope> = internal.makePool /** * @since 1.0.0 @@ -236,7 +235,7 @@ export const makePool: ( export const makePoolLayer: ( tag: Context.Tag>, options: WorkerPool.Options -) => Layer.Layer = internal.makePoolLayer +) => Layer.Layer = internal.makePoolLayer /** * @since 1.0.0 @@ -277,7 +276,6 @@ export declare namespace SerializedWorker { * @category models */ export interface BaseOptions { - readonly permits?: number readonly queue?: WorkerQueue } } @@ -341,7 +339,8 @@ export const makeSerialized: ( */ export const makePoolSerialized: ( options: SerializedWorkerPool.Options -) => Effect.Effect, never, WorkerManager | Spawner | Scope.Scope> = internal.makePoolSerialized +) => Effect.Effect, WorkerError, WorkerManager | Spawner | Scope.Scope> = + internal.makePoolSerialized /** * @since 1.0.0 @@ -350,7 +349,7 @@ export const makePoolSerialized: ( export const makePoolSerializedLayer: ( tag: Context.Tag>, options: SerializedWorkerPool.Options -) => Layer.Layer = internal.makePoolSerializedLayer +) => Layer.Layer = internal.makePoolSerializedLayer /** * @since 1.0.0 diff --git a/packages/platform/src/WorkerError.ts b/packages/platform/src/WorkerError.ts index dc33da8c3f..4841a3c4b3 100644 --- a/packages/platform/src/WorkerError.ts +++ b/packages/platform/src/WorkerError.ts @@ -2,8 +2,7 @@ * @since 1.0.0 */ import * as Schema from "@effect/schema/Schema" -import * as Cause from "effect/Cause" -import { identity } from "effect/Function" +import type * as Cause from "effect/Cause" import * as Predicate from "effect/Predicate" import * as internal from "./internal/workerError.js" @@ -25,27 +24,13 @@ export type WorkerErrorTypeId = typeof WorkerErrorTypeId */ export const isWorkerError = (u: unknown): u is WorkerError => Predicate.hasProperty(u, WorkerErrorTypeId) -const causeDefectPretty: Schema.Schema = Schema.transform( - Schema.Unknown, - Schema.Unknown, - { - decode: identity, - encode: (defect) => { - if (Predicate.isObject(defect)) { - return Cause.pretty(Cause.die(defect)) - } - return String(defect) - } - } -) - /** * @since 1.0.0 * @category errors */ export class WorkerError extends Schema.TaggedError()("WorkerError", { reason: Schema.Literal("spawn", "decode", "send", "unknown", "encode"), - error: causeDefectPretty + error: Schema.CauseDefectUnknown }) { /** * @since 1.0.0 @@ -58,7 +43,7 @@ export class WorkerError extends Schema.TaggedError()("WorkerError" static readonly Cause: Schema.Schema< Cause.Cause, Schema.CauseEncoded - > = Schema.Cause({ defect: causeDefectPretty, error: this }) + > = Schema.Cause({ error: this }) /** * @since 1.0.0 diff --git a/packages/platform/src/internal/worker.ts b/packages/platform/src/internal/worker.ts index 3d221f9852..aa8d7eb795 100644 --- a/packages/platform/src/internal/worker.ts +++ b/packages/platform/src/internal/worker.ts @@ -67,7 +67,6 @@ export const makeManager = Effect.gen(function*() { spawn({ encode, initialMessage, - permits = 1, queue, transfers = (_) => [] }: Worker.Worker.Options) { @@ -75,7 +74,6 @@ export const makeManager = Effect.gen(function*() { const spawn = yield* _(Spawner) const id = idCounter++ let requestIdCounter = 0 - const semaphore = yield* Effect.makeSemaphore(permits) const requestMap = new Map< number, readonly [Queue.Queue, E | WorkerError>>, Deferred.Deferred] @@ -237,8 +235,7 @@ export const makeManager = Effect.gen(function*() { executeRelease ) - yield* semaphore.take(1).pipe( - Effect.zipRight(outbound.take), + yield* outbound.take.pipe( Effect.flatMap(([id, request, span]) => pipe( Effect.suspend(() => { @@ -261,7 +258,6 @@ export const makeManager = Effect.gen(function*() { Effect.zipRight(Deferred.await(result[1])) ) }), - Effect.ensuring(semaphore.release(1)), Effect.fork ) ), @@ -318,19 +314,21 @@ export const makePool = ( discard: true }), execute: (message: I) => - Stream.unwrap( + Stream.unwrapScoped( Effect.map( - Effect.scoped(backing.get), + backing.get, (worker) => worker.execute(message) ) ), executeEffect: (message: I) => - Effect.flatMap( - Effect.scoped(backing.get), - (worker) => worker.executeEffect(message) + Effect.scoped( + Effect.flatMap(backing.get, (worker) => worker.executeEffect(message)) ) } + // report any spawn errors + yield* Effect.scoped(backing.get) + return pool }) @@ -418,19 +416,21 @@ export const makePoolSerialized = ( discard: true }) as any, execute: (message: Req) => - Stream.unwrap( + Stream.unwrapScoped( Effect.map( - Effect.scoped(backing.get), + backing.get, (worker) => worker.execute(message) ) ) as any, executeEffect: (message: Req) => - Effect.flatMap( - Effect.scoped(backing.get), - (worker) => worker.executeEffect(message) + Effect.scoped( + Effect.flatMap(backing.get, (worker) => worker.executeEffect(message)) ) as any } + // report any spawn errors + yield* Effect.scoped(backing.get) + return pool })