Skip to content

Commit

Permalink
remove pool resizing in platform workers to enable concurrent access (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed May 23, 2024
1 parent af0543c commit 5133ca9
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 58 deletions.
9 changes: 9 additions & 0 deletions .changeset/tidy-cycles-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@effect/platform": minor
"@effect/platform-browser": minor
"@effect/platform-bun": minor
"@effect/platform-node": minor
"@effect/platform-node-shared": minor
---

remove pool resizing in platform workers to enable concurrent access
4 changes: 1 addition & 3 deletions packages/platform-bun/examples/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ interface MyWorkerPool {
}
const Pool = Context.GenericTag<MyWorkerPool, Worker.WorkerPool<number, never, number>>("@app/MyWorkerPool")
const PoolLive = Worker.makePoolLayer(Pool, {
minSize: 0,
maxSize: OS.availableParallelism(),
timeToLive: 10000
size: OS.availableParallelism()
}).pipe(
Layer.provide(BunWorker.layer(() => new globalThis.Worker("./examples/worker/range.ts")))
)
Expand Down
6 changes: 1 addition & 5 deletions packages/platform-node/examples/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ interface MyWorkerPool {
readonly _: unique symbol
}
const Pool = Context.GenericTag<MyWorkerPool, Worker.WorkerPool<number, never, number>>("@app/MyWorkerPool")
const PoolLive = Worker.makePoolLayer(Pool, {
minSize: 0,
maxSize: 3,
timeToLive: 30000
}).pipe(
const PoolLive = Worker.makePoolLayer(Pool, { size: 3 }).pipe(
Layer.provide(NodeWorker.layer(() => new WT.Worker("./examples/worker/range.ts")))
)

Expand Down
27 changes: 11 additions & 16 deletions packages/platform/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ export declare namespace Worker {
* @category models
*/
export interface Options<I> {
readonly encode?: (message: I) => Effect.Effect<unknown, WorkerError>
readonly transfers?: (message: I) => ReadonlyArray<unknown>
readonly queue?: WorkerQueue<I>
readonly initialMessage?: LazyArg<I>
readonly encode?: ((message: I) => Effect.Effect<unknown, WorkerError>) | undefined
readonly transfers?: ((message: I) => ReadonlyArray<unknown>) | undefined
readonly queue?: WorkerQueue<I> | undefined
readonly initialMessage?: LazyArg<I> | undefined
readonly permits?: number | undefined
}

/**
Expand Down Expand Up @@ -156,17 +157,10 @@ export declare namespace WorkerPool {
* @since 1.0.0
* @category models
*/
export type Options<I> =
& Worker.Options<I>
& ({
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect.Effect<void, WorkerError>
readonly size: number
} | {
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect.Effect<void, WorkerError>
readonly minSize: number
readonly maxSize: number
readonly timeToLive: Duration.DurationInput
})
export interface Options<I> extends Worker.Options<I> {
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect.Effect<void, WorkerError>
readonly size: number
}
}

/**
Expand Down Expand Up @@ -276,7 +270,8 @@ export declare namespace SerializedWorker {
* @category models
*/
export interface BaseOptions<I> {
readonly queue?: WorkerQueue<I>
readonly permits?: number | undefined
readonly queue?: WorkerQueue<I> | undefined
}
}

Expand Down
51 changes: 17 additions & 34 deletions packages/platform/src/internal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ export const makeManager = Effect.gen(function*() {
spawn<I, O, E>({
encode,
initialMessage,
permits = 1,
queue,
transfers = (_) => []
}: Worker.Worker.Options<I>) {
return Effect.gen(function*(_) {
const spawn = yield* _(Spawner)
const id = idCounter++
let requestIdCounter = 0
const semaphore = Effect.unsafeMakeSemaphore(permits)
const requestMap = new Map<
number,
readonly [Queue.Queue<Exit.Exit<ReadonlyArray<O>, E | WorkerError>>, Deferred.Deferred<void>]
Expand Down Expand Up @@ -235,7 +237,8 @@ export const makeManager = Effect.gen(function*() {
executeRelease
)

yield* outbound.take.pipe(
yield* semaphore.take(1).pipe(
Effect.andThen(outbound.take),
Effect.flatMap(([id, request, span]) =>
pipe(
Effect.suspend(() => {
Expand All @@ -258,6 +261,7 @@ export const makeManager = Effect.gen(function*() {
Effect.zipRight(Deferred.await(result[1]))
)
}),
Effect.ensuring(semaphore.release(1)),
Effect.fork
)
),
Expand Down Expand Up @@ -295,39 +299,24 @@ export const makePool = <I, O, E>(
Effect.tap((worker) => Effect.addFinalizer(() => Effect.sync(() => workers.delete(worker)))),
options.onCreate ? Effect.tap(options.onCreate) : identity
)
const backing = yield* "timeToLive" in options ?
Pool.makeWithTTL({
acquire,
min: options.minSize,
max: options.maxSize,
timeToLive: options.timeToLive
}) :
Pool.make({
acquire,
size: options.size
})
const backing = yield* Pool.make({
acquire,
size: options.size
})
const get = Effect.scoped(backing.get)
const pool: Worker.WorkerPool<I, O, E> = {
backing,
broadcast: (message: I) =>
Effect.forEach(workers, (worker) => worker.executeEffect(message), {
concurrency: "unbounded",
discard: true
}),
execute: (message: I) =>
Stream.unwrapScoped(
Effect.map(
backing.get,
(worker) => worker.execute(message)
)
),
executeEffect: (message: I) =>
Effect.scoped(
Effect.flatMap(backing.get, (worker) => worker.executeEffect(message))
)
execute: (message: I) => Stream.unwrap(Effect.map(get, (worker) => worker.execute(message))),
executeEffect: (message: I) => Effect.flatMap(get, (worker) => worker.executeEffect(message))
}

// report any spawn errors
yield* Effect.scoped(backing.get)
yield* get

return pool
})
Expand Down Expand Up @@ -408,6 +397,7 @@ export const makePoolSerialized = <I extends Schema.TaggedRequest.Any>(
acquire,
size: options.size
})
const get = Effect.scoped(backing.get)
const pool: Worker.SerializedWorkerPool<I> = {
backing,
broadcast: <Req extends I>(message: Req) =>
Expand All @@ -416,20 +406,13 @@ export const makePoolSerialized = <I extends Schema.TaggedRequest.Any>(
discard: true
}) as any,
execute: <Req extends I>(message: Req) =>
Stream.unwrapScoped(
Effect.map(
backing.get,
(worker) => worker.execute(message)
)
) as any,
Stream.unwrap(Effect.map(get, (worker) => worker.execute(message))) as any,
executeEffect: <Req extends I>(message: Req) =>
Effect.scoped(
Effect.flatMap(backing.get, (worker) => worker.executeEffect(message))
) as any
Effect.flatMap(get, (worker) => worker.executeEffect(message)) as any
}

// report any spawn errors
yield* Effect.scoped(backing.get)
yield* get

return pool
})
Expand Down

0 comments on commit 5133ca9

Please sign in to comment.