Skip to content

Commit

Permalink
improve worker pool error reporting and finalization (#2819)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed May 22, 2024
1 parent c07e0ce commit 1af94df
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 46 deletions.
5 changes: 5 additions & 0 deletions .changeset/dry-waves-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

ensure pool calls finalizer for failed acquisitions
9 changes: 9 additions & 0 deletions .changeset/eighty-mugs-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@effect/platform-browser": minor
"@effect/platform": minor
"@effect/platform-bun": minor
"@effect/platform-node": minor
"@effect/platform-node-shared": minor
---

remove `permits` from workers, to prevent issues with pool resizing
5 changes: 5 additions & 0 deletions .changeset/stale-beers-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": patch
---

ensure worker pool construction errors are reported during creation
15 changes: 9 additions & 6 deletions packages/effect/src/internal/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,15 @@ class PoolImpl<in out A, in out E> implements Pool.Pool<A, E> {
const release = (attempted: Attempted<A, E>): Effect.Effect<unknown> =>
core.exitMatch(attempted.result, {
onFailure: () =>
core.flatten(ref.modify(this.state, (state) => {
if (state.size <= this.min) {
return [allocateUinterruptible(this), { ...state, free: state.free + 1 }] as const
}
return [core.void, { ...state, size: state.size - 1 }] as const
})),
core.zipRight(
attempted.finalizer,
core.flatten(ref.modify(this.state, (state) => {
if (state.size <= this.min) {
return [allocateUinterruptible(this), { ...state, free: state.free + 1 }] as const
}
return [core.void, { ...state, size: state.size - 1 }] as const
}))
),
onSuccess: (item) =>
core.flatMap(ref.get(this.invalidated), (set) => {
if (pipe(set, HashSet.has(item))) {
Expand Down
21 changes: 21 additions & 0 deletions packages/effect/test/Pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,25 @@ describe("Pool", () => {
const result = yield* $(Fiber.interrupt(fiber))
expect(result).toEqual(Exit.interrupt(fiberId))
}))

it.scoped("finalizer is called for failed allocations", () =>
Effect.gen(function*() {
const scope = yield* Scope.make()
const count = yield* Ref.make(0)
const get = Effect.acquireRelease(
Ref.updateAndGet(count, (n) => n + 1),
() => Ref.update(count, (n) => n - 1)
).pipe(
Effect.andThen(Effect.fail("boom"))
)
const pool = yield* Pool.make({ acquire: get, size: 10 }).pipe(
Scope.extend(scope)
)
yield* Effect.scoped(pool.get).pipe(
Effect.ignore
)
expect(yield* Ref.get(count)).toBe(10)
yield* Scope.close(scope, Exit.void)
expect(yield* Ref.get(count)).toBe(0)
}))
})
5 changes: 4 additions & 1 deletion packages/platform-browser/src/internal/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as Worker from "@effect/platform/Worker"
import { WorkerError } from "@effect/platform/WorkerError"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
Expand All @@ -19,6 +20,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({
yield* _(Effect.addFinalizer(() => Effect.sync(() => port.postMessage([1]))))

const queue = yield* _(Queue.unbounded<Worker.BackingWorker.Message<O>>())
const latch = yield* Deferred.make<void>()

const fiber = yield* _(
Effect.async<never, WorkerError, never>((resume) => {
Expand All @@ -30,6 +32,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({
}
port.addEventListener("message", onMessage as any)
port.addEventListener("error", onError as any)
Deferred.unsafeDone(latch, Effect.void)
return Effect.sync(() => {
port.removeEventListener("message", onMessage as any)
port.removeEventListener("error", onError as any)
Expand All @@ -38,7 +41,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({
Effect.interruptible,
Effect.forkScoped
)
yield* _(Effect.yieldNow())
yield* Deferred.await(latch)

if ("start" in port) {
port.start()
Expand Down
11 changes: 5 additions & 6 deletions packages/platform/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ export declare namespace Worker {
export interface Options<I> {
readonly encode?: (message: I) => Effect.Effect<unknown, WorkerError>
readonly transfers?: (message: I) => ReadonlyArray<unknown>
readonly permits?: number
readonly queue?: WorkerQueue<I>
readonly initialMessage?: LazyArg<I>
}
Expand Down Expand Up @@ -227,7 +226,7 @@ export const layerManager: Layer.Layer<WorkerManager, never, PlatformWorker> = i
*/
export const makePool: <I, O, E>(
options: WorkerPool.Options<I>
) => Effect.Effect<WorkerPool<I, O, E>, never, WorkerManager | Spawner | Scope.Scope> = internal.makePool
) => Effect.Effect<WorkerPool<I, O, E>, WorkerError, WorkerManager | Spawner | Scope.Scope> = internal.makePool

/**
* @since 1.0.0
Expand All @@ -236,7 +235,7 @@ export const makePool: <I, O, E>(
export const makePoolLayer: <Tag, I, O, E>(
tag: Context.Tag<Tag, WorkerPool<I, O, E>>,
options: WorkerPool.Options<I>
) => Layer.Layer<Tag, never, WorkerManager | Spawner> = internal.makePoolLayer
) => Layer.Layer<Tag, WorkerError, WorkerManager | Spawner> = internal.makePoolLayer

/**
* @since 1.0.0
Expand Down Expand Up @@ -277,7 +276,6 @@ export declare namespace SerializedWorker {
* @category models
*/
export interface BaseOptions<I> {
readonly permits?: number
readonly queue?: WorkerQueue<I>
}
}
Expand Down Expand Up @@ -341,7 +339,8 @@ export const makeSerialized: <I extends Schema.TaggedRequest.Any>(
*/
export const makePoolSerialized: <I extends Schema.TaggedRequest.Any>(
options: SerializedWorkerPool.Options<I>
) => Effect.Effect<SerializedWorkerPool<I>, never, WorkerManager | Spawner | Scope.Scope> = internal.makePoolSerialized
) => Effect.Effect<SerializedWorkerPool<I>, WorkerError, WorkerManager | Spawner | Scope.Scope> =
internal.makePoolSerialized

/**
* @since 1.0.0
Expand All @@ -350,7 +349,7 @@ export const makePoolSerialized: <I extends Schema.TaggedRequest.Any>(
export const makePoolSerializedLayer: <Tag, I extends Schema.TaggedRequest.Any>(
tag: Context.Tag<Tag, SerializedWorkerPool<I>>,
options: SerializedWorkerPool.Options<I>
) => Layer.Layer<Tag, never, WorkerManager | Spawner> = internal.makePoolSerializedLayer
) => Layer.Layer<Tag, WorkerError, WorkerManager | Spawner> = internal.makePoolSerializedLayer

/**
* @since 1.0.0
Expand Down
21 changes: 3 additions & 18 deletions packages/platform/src/WorkerError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
* @since 1.0.0
*/
import * as Schema from "@effect/schema/Schema"
import * as Cause from "effect/Cause"
import { identity } from "effect/Function"
import type * as Cause from "effect/Cause"
import * as Predicate from "effect/Predicate"
import * as internal from "./internal/workerError.js"

Expand All @@ -25,27 +24,13 @@ export type WorkerErrorTypeId = typeof WorkerErrorTypeId
*/
export const isWorkerError = (u: unknown): u is WorkerError => Predicate.hasProperty(u, WorkerErrorTypeId)

const causeDefectPretty: Schema.Schema<unknown> = Schema.transform(
Schema.Unknown,
Schema.Unknown,
{
decode: identity,
encode: (defect) => {
if (Predicate.isObject(defect)) {
return Cause.pretty(Cause.die(defect))
}
return String(defect)
}
}
)

/**
* @since 1.0.0
* @category errors
*/
export class WorkerError extends Schema.TaggedError<WorkerError>()("WorkerError", {
reason: Schema.Literal("spawn", "decode", "send", "unknown", "encode"),
error: causeDefectPretty
error: Schema.CauseDefectUnknown
}) {
/**
* @since 1.0.0
Expand All @@ -58,7 +43,7 @@ export class WorkerError extends Schema.TaggedError<WorkerError>()("WorkerError"
static readonly Cause: Schema.Schema<
Cause.Cause<WorkerError>,
Schema.CauseEncoded<WorkerErrorFrom>
> = Schema.Cause({ defect: causeDefectPretty, error: this })
> = Schema.Cause({ error: this })

/**
* @since 1.0.0
Expand Down
30 changes: 15 additions & 15 deletions packages/platform/src/internal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,13 @@ export const makeManager = Effect.gen(function*() {
spawn<I, O, E>({
encode,
initialMessage,
permits = 1,
queue,
transfers = (_) => []
}: Worker.Worker.Options<I>) {
return Effect.gen(function*(_) {
const spawn = yield* _(Spawner)
const id = idCounter++
let requestIdCounter = 0
const semaphore = yield* Effect.makeSemaphore(permits)
const requestMap = new Map<
number,
readonly [Queue.Queue<Exit.Exit<ReadonlyArray<O>, E | WorkerError>>, Deferred.Deferred<void>]
Expand Down Expand Up @@ -237,8 +235,7 @@ export const makeManager = Effect.gen(function*() {
executeRelease
)

yield* semaphore.take(1).pipe(
Effect.zipRight(outbound.take),
yield* outbound.take.pipe(
Effect.flatMap(([id, request, span]) =>
pipe(
Effect.suspend(() => {
Expand All @@ -261,7 +258,6 @@ export const makeManager = Effect.gen(function*() {
Effect.zipRight(Deferred.await(result[1]))
)
}),
Effect.ensuring(semaphore.release(1)),
Effect.fork
)
),
Expand Down Expand Up @@ -318,19 +314,21 @@ export const makePool = <I, O, E>(
discard: true
}),
execute: (message: I) =>
Stream.unwrap(
Stream.unwrapScoped(
Effect.map(
Effect.scoped(backing.get),
backing.get,
(worker) => worker.execute(message)
)
),
executeEffect: (message: I) =>
Effect.flatMap(
Effect.scoped(backing.get),
(worker) => worker.executeEffect(message)
Effect.scoped(
Effect.flatMap(backing.get, (worker) => worker.executeEffect(message))
)
}

// report any spawn errors
yield* Effect.scoped(backing.get)

return pool
})

Expand Down Expand Up @@ -418,19 +416,21 @@ export const makePoolSerialized = <I extends Schema.TaggedRequest.Any>(
discard: true
}) as any,
execute: <Req extends I>(message: Req) =>
Stream.unwrap(
Stream.unwrapScoped(
Effect.map(
Effect.scoped(backing.get),
backing.get,
(worker) => worker.execute(message)
)
) as any,
executeEffect: <Req extends I>(message: Req) =>
Effect.flatMap(
Effect.scoped(backing.get),
(worker) => worker.executeEffect(message)
Effect.scoped(
Effect.flatMap(backing.get, (worker) => worker.executeEffect(message))
) as any
}

// report any spawn errors
yield* Effect.scoped(backing.get)

return pool
})

Expand Down

0 comments on commit 1af94df

Please sign in to comment.