diff --git a/.changeset/brave-eggs-allow.md b/.changeset/brave-eggs-allow.md new file mode 100644 index 0000000000..18d719c1d8 --- /dev/null +++ b/.changeset/brave-eggs-allow.md @@ -0,0 +1,7 @@ +--- +"effect": minor +--- + +close FiberHandle/FiberSet/FiberMap when it is released + +When they are closed, fibers can no longer be added to them. diff --git a/.changeset/odd-ears-build.md b/.changeset/odd-ears-build.md new file mode 100644 index 0000000000..4aa94a46ed --- /dev/null +++ b/.changeset/odd-ears-build.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +add FiberMap.has/unsafeHas api diff --git a/.changeset/sweet-apples-applaud.md b/.changeset/sweet-apples-applaud.md new file mode 100644 index 0000000000..9031d76af4 --- /dev/null +++ b/.changeset/sweet-apples-applaud.md @@ -0,0 +1,24 @@ +--- +"effect": patch +--- + +add FiberHandle module, for holding a reference to a running fiber + +```ts +import { Effect, FiberHandle } from "effect" + +Effect.gen(function* (_) { + const handle = yield* _(FiberHandle.make()) + + // run some effects + yield* _(FiberHandle.run(handle, Effect.never)) + // this will interrupt the previous fiber + yield* _(FiberHandle.run(handle, Effect.never)) + // this will not run, as a fiber is already running + yield* _(FiberHandle.run(handle, Effect.never, { onlyIfMissing: true })) + + yield* _(Effect.sleep(1000)) +}).pipe( + Effect.scoped // The fiber will be interrupted when the scope is closed +) +``` diff --git a/packages/effect/src/FiberHandle.ts b/packages/effect/src/FiberHandle.ts new file mode 100644 index 0000000000..e1a909fdc9 --- /dev/null +++ b/packages/effect/src/FiberHandle.ts @@ -0,0 +1,414 @@ +/** + * @since 2.0.0 + */ +import * as Effect from "effect/Effect" +import type * as Scope from "effect/Scope" +import type { NoSuchElementException } from "./Cause.js" +import * as Cause from "./Cause.js" +import * as Deferred from "./Deferred.js" +import * as Exit from "./Exit.js" +import * as Fiber from "./Fiber.js" +import * as FiberId from "./FiberId.js" +import * as FiberRef from "./FiberRef.js" +import { dual } from "./Function.js" +import * as Inspectable from "./Inspectable.js" +import type { FiberRuntime } from "./internal/fiberRuntime.js" +import * as Option from "./Option.js" +import { type Pipeable, pipeArguments } from "./Pipeable.js" +import * as Predicate from "./Predicate.js" +import * as Runtime from "./Runtime.js" + +/** + * @since 2.0.0 + * @categories type ids + */ +export const TypeId = Symbol.for("effect/FiberHandle") + +/** + * @since 2.0.0 + * @categories type ids + */ +export type TypeId = typeof TypeId + +/** + * @since 2.0.0 + * @categories models + */ +export interface FiberHandle extends Pipeable, Inspectable.Inspectable { + readonly [TypeId]: TypeId + readonly deferred: Deferred.Deferred + /** @internal */ + state: { + readonly _tag: "Open" + fiber: Fiber.RuntimeFiber | undefined + } | { + readonly _tag: "Closed" + } +} + +/** + * @since 2.0.0 + * @categories refinements + */ +export const isFiberHandle = (u: unknown): u is FiberHandle => Predicate.hasProperty(u, TypeId) + +const Proto = { + [TypeId]: TypeId, + toString(this: FiberHandle) { + return Inspectable.format(this.toJSON()) + }, + toJSON(this: FiberHandle) { + return { + _id: "FiberHandle", + state: this.state + } + }, + [Inspectable.NodeInspectSymbol](this: FiberHandle) { + return this.toJSON() + }, + pipe() { + return pipeArguments(this, arguments) + } +} + +const unsafeMake = ( + deferred: Deferred.Deferred +): FiberHandle => { + const self = Object.create(Proto) + self.state = { _tag: "Open", fiber: undefined } + self.deferred = deferred + return self +} + +/** + * A FiberHandle can be used to store a single fiber. + * When the associated Scope is closed, the contained fiber will be interrupted. + * + * You can add a fiber to the handle using `FiberHandle.run`, and the fiber will + * be automatically removed from the FiberHandle when it completes. + * + * @example + * import { Effect, FiberHandle } from "effect" + * + * Effect.gen(function*(_) { + * const handle = yield* _(FiberHandle.make()) + * + * // run some effects + * yield* _(FiberHandle.run(handle, Effect.never)) + * // this will interrupt the previous fiber + * yield* _(FiberHandle.run(handle, Effect.never)) + * + * yield* _(Effect.sleep(1000)) + * }).pipe( + * Effect.scoped // The fiber will be interrupted when the scope is closed + * ) + * + * @since 2.0.0 + * @categories constructors + */ +export const make = (): Effect.Effect, never, Scope.Scope> => + Effect.acquireRelease( + Effect.map(Deferred.make(), (deferred) => unsafeMake(deferred)), + (handle) => + Effect.zipRight( + clear(handle), + Effect.suspend(() => { + handle.state = { _tag: "Closed" } + return Deferred.done(handle.deferred, Exit.void) + }) + ) + ) + +/** + * Create an Effect run function that is backed by a FiberHandle. + * + * @since 2.0.0 + * @categories constructors + */ +export const makeRuntime = (): Effect.Effect< + ( + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { + readonly onlyIfMissing?: boolean | undefined + } + | undefined + ) => Fiber.RuntimeFiber, + never, + Scope.Scope | R +> => + Effect.flatMap( + make(), + (self) => runtime(self)() + ) + +/** + * Set the fiber in a FiberHandle. When the fiber completes, it will be removed from the FiberHandle. + * If a fiber is already running, it will be interrupted unless `options.onlyIfMissing` is set. + * + * @since 2.0.0 + * @categories combinators + */ +export const unsafeSet: { + ( + fiber: Fiber.RuntimeFiber, + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + readonly onlyIfMissing?: boolean | undefined + } + ): (self: FiberHandle) => void + ( + self: FiberHandle, + fiber: Fiber.RuntimeFiber, + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + readonly onlyIfMissing?: boolean | undefined + } + ): void +} = dual((args) => isFiberHandle(args[0]), ( + self: FiberHandle, + fiber: Fiber.RuntimeFiber, + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + readonly onlyIfMissing?: boolean | undefined + } +): void => { + if (self.state._tag === "Closed") { + fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + return + } else if (self.state.fiber !== undefined) { + if (options?.onlyIfMissing === true) { + fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + return + } else if (self.state.fiber === fiber) { + return + } + self.state.fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + self.state.fiber === undefined + } + + ;(fiber as FiberRuntime).setFiberRef(FiberRef.unhandledErrorLogLevel, Option.none()) + self.state.fiber = fiber + fiber.addObserver((exit) => { + if (self.state._tag === "Open" && fiber === self.state.fiber) { + self.state.fiber = undefined + } + if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) { + Deferred.unsafeDone(self.deferred, exit as any) + } + }) +}) + +/** + * Set the fiber in the FiberHandle. When the fiber completes, it will be removed from the FiberHandle. + * If a fiber already exists in the FiberHandle, it will be interrupted unless `options.onlyIfMissing` is set. + * + * @since 2.0.0 + * @categories combinators + */ +export const set: { + ( + fiber: Fiber.RuntimeFiber, + options?: { + readonly onlyIfMissing?: boolean + } + ): (self: FiberHandle) => Effect.Effect + ( + self: FiberHandle, + fiber: Fiber.RuntimeFiber, + options?: { + readonly onlyIfMissing?: boolean + } + ): Effect.Effect +} = dual((args) => isFiberHandle(args[0]), ( + self: FiberHandle, + fiber: Fiber.RuntimeFiber, + options?: { + readonly onlyIfMissing?: boolean + } +): Effect.Effect => + Effect.fiberIdWith( + (fiberId) => + Effect.sync(() => + unsafeSet(self, fiber, { + interruptAs: fiberId, + onlyIfMissing: options?.onlyIfMissing + }) + ) + )) + +/** + * Retrieve the fiber from the FiberHandle. + * + * @since 2.0.0 + * @categories combinators + */ +export const unsafeGet = (self: FiberHandle): Option.Option> => + self.state._tag === "Closed" ? Option.none() : Option.fromNullable(self.state.fiber) + +/** + * Retrieve the fiber from the FiberHandle. + * + * @since 2.0.0 + * @categories combinators + */ +export const get = (self: FiberHandle): Effect.Effect, NoSuchElementException> => + Effect.suspend(() => unsafeGet(self)) + +/** + * @since 2.0.0 + * @categories combinators + */ +export const clear = (self: FiberHandle): Effect.Effect => + Effect.uninterruptibleMask((restore) => + Effect.suspend(() => { + if (self.state._tag === "Closed" || self.state.fiber === undefined) { + return Effect.void + } + return Effect.zipRight( + restore(Fiber.interrupt(self.state.fiber)), + Effect.sync(() => { + if (self.state._tag === "Open") { + self.state.fiber = undefined + } + }) + ) + }) + ) + +/** + * Run an Effect and add the forked fiber to the FiberHandle. + * When the fiber completes, it will be removed from the FiberHandle. + * + * @since 2.0.0 + * @categories combinators + */ +export const run: { + ( + self: FiberHandle, + options?: { + readonly onlyIfMissing?: boolean + } + ): ( + effect: Effect.Effect + ) => Effect.Effect, never, R> + ( + self: FiberHandle, + effect: Effect.Effect, + options?: { + readonly onlyIfMissing?: boolean + } + ): Effect.Effect, never, R> +} = function() { + const self = arguments[0] as FiberHandle + if (Effect.isEffect(arguments[1])) { + const effect = arguments[1] + const options = arguments[2] as { readonly onlyIfMissing?: boolean } | undefined + return Effect.suspend(() => { + if (self.state._tag === "Closed") { + return Effect.interrupt + } + return Effect.uninterruptibleMask((restore) => + Effect.tap( + restore(Effect.forkDaemon(effect)), + (fiber) => set(self, fiber, options) + ) + ) + }) as any + } + const options = arguments[1] as { readonly onlyIfMissing?: boolean } | undefined + return (effect: Effect.Effect) => + Effect.suspend(() => { + if (self.state._tag === "Closed") { + return Effect.interrupt + } + return Effect.uninterruptibleMask((restore) => + Effect.tap( + restore(Effect.forkDaemon(effect)), + (fiber) => set(self, fiber, options) + ) + ) + }) +} + +/** + * Capture a Runtime and use it to fork Effect's, adding the forked fibers to the FiberHandle. + * + * @example + * import { Context, Effect, FiberHandle } from "effect" + * + * interface Users { + * readonly _: unique symbol + * } + * const Users = Context.GenericTag> + * }>("Users") + * + * Effect.gen(function*(_) { + * const handle = yield* _(FiberHandle.make()) + * const run = yield* _(FiberHandle.runtime(handle)()) + * + * // run an effect and set the fiber in the handle + * run(Effect.andThen(Users, _ => _.getAll)) + * + * // this will interrupt the previous fiber + * run(Effect.andThen(Users, _ => _.getAll)) + * }).pipe( + * Effect.scoped // The fiber will be interrupted when the scope is closed + * ) + * + * @since 2.0.0 + * @categories combinators + */ +export const runtime: ( + self: FiberHandle +) => () => Effect.Effect< + ( + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { + readonly onlyIfMissing?: boolean | undefined + } + | undefined + ) => Fiber.RuntimeFiber, + never, + R +> = (self: FiberHandle) => () => + Effect.map( + Effect.runtime(), + (runtime) => { + const runFork = Runtime.runFork(runtime) + return ( + effect: Effect.Effect, + options?: + | Runtime.RunForkOptions & { + readonly onlyIfMissing?: boolean | undefined + } + | undefined + ) => { + const fiber = runFork(effect, options) + unsafeSet(self, fiber, options) + return fiber + } + } + ) + +/** + * If any of the Fiber's in the handle terminate with a failure, + * the returned Effect will terminate with the first failure that occurred. + * + * @since 2.0.0 + * @categories combinators + * @example + * import { Effect, FiberHandle } from "effect"; + * + * Effect.gen(function* (_) { + * const handle = yield* _(FiberHandle.make()); + * yield* _(FiberHandle.set(handle, Effect.runFork(Effect.fail("error")))); + * + * // parent fiber will fail with "error" + * yield* _(FiberHandle.join(handle)); + * }); + */ +export const join = (self: FiberHandle): Effect.Effect => + Deferred.await(self.deferred as Deferred.Deferred) diff --git a/packages/effect/src/FiberMap.ts b/packages/effect/src/FiberMap.ts index 353c8580cc..eb4fd8d53c 100644 --- a/packages/effect/src/FiberMap.ts +++ b/packages/effect/src/FiberMap.ts @@ -13,6 +13,7 @@ import * as FiberRef from "./FiberRef.js" import { dual } from "./Function.js" import * as Inspectable from "./Inspectable.js" import type { FiberRuntime } from "./internal/fiberRuntime.js" +import * as Iterable from "./Iterable.js" import * as MutableHashMap from "./MutableHashMap.js" import * as Option from "./Option.js" import { type Pipeable, pipeArguments } from "./Pipeable.js" @@ -39,8 +40,14 @@ export interface FiberMap extends Pipeable, Inspectable.Inspectable, Iterable<[K, Fiber.RuntimeFiber]> { readonly [TypeId]: TypeId - readonly backing: MutableHashMap.MutableHashMap> - readonly deferred: Deferred.Deferred + readonly deferred: Deferred.Deferred + /** @internal */ + state: { + readonly _tag: "Open" + readonly backing: MutableHashMap.MutableHashMap> + } | { + readonly _tag: "Closed" + } } /** @@ -52,7 +59,10 @@ export const isFiberMap = (u: unknown): u is FiberMap => Predicate.hasP const Proto = { [TypeId]: TypeId, [Symbol.iterator](this: FiberMap) { - return this.backing[Symbol.iterator]() + if (this.state._tag === "Closed") { + return Iterable.empty() + } + return this.state.backing[Symbol.iterator]() }, toString(this: FiberMap) { return Inspectable.format(this.toJSON()) @@ -60,7 +70,7 @@ const Proto = { toJSON(this: FiberMap) { return { _id: "FiberMap", - backing: this.backing.toJSON() + state: this.state } }, [Inspectable.NodeInspectSymbol](this: FiberMap) { @@ -73,10 +83,10 @@ const Proto = { const unsafeMake = ( backing: MutableHashMap.MutableHashMap>, - deferred: Deferred.Deferred + deferred: Deferred.Deferred ): FiberMap => { const self = Object.create(Proto) - self.backing = backing + self.state = { _tag: "Open", backing } self.deferred = deferred return self } @@ -108,12 +118,19 @@ const unsafeMake = ( */ export const make = (): Effect.Effect, never, Scope.Scope> => Effect.acquireRelease( - Effect.map(Deferred.make(), (deferred) => + Effect.map(Deferred.make(), (deferred) => unsafeMake( MutableHashMap.empty(), deferred )), - clear + (map) => + Effect.zipRight( + clear(map), + Effect.suspend(() => { + map.state = { _tag: "Closed" } + return Deferred.done(map.deferred, Exit.void) + }) + ) ) /** @@ -126,7 +143,11 @@ export const makeRuntime = (): Effect.Effect< ( key: K, effect: Effect.Effect, - options?: Runtime.RunForkOptions | undefined + options?: + | Runtime.RunForkOptions & { + readonly onlyIfMissing?: boolean | undefined + } + | undefined ) => Fiber.RuntimeFiber, never, Scope.Scope | R @@ -147,41 +168,54 @@ export const unsafeSet: { ( key: K, fiber: Fiber.RuntimeFiber, - interruptAs?: FiberId.FiberId + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + readonly onlyIfMissing?: boolean | undefined + } | undefined ): (self: FiberMap) => void ( self: FiberMap, key: K, fiber: Fiber.RuntimeFiber, - interruptAs?: FiberId.FiberId + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + readonly onlyIfMissing?: boolean | undefined + } | undefined ): void -} = dual< - ( - key: K, - fiber: Fiber.RuntimeFiber, - interruptAs?: FiberId.FiberId - ) => (self: FiberMap) => void, - ( - self: FiberMap, - key: K, - fiber: Fiber.RuntimeFiber, - interruptAs?: FiberId.FiberId - ) => void ->((args) => isFiberMap(args[0]), (self, key, fiber, interruptAs) => { - const previous = MutableHashMap.get(self.backing, key) +} = dual((args) => isFiberMap(args[0]), ( + self: FiberMap, + key: K, + fiber: Fiber.RuntimeFiber, + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + readonly onlyIfMissing?: boolean | undefined + } | undefined +): void => { + if (self.state._tag === "Closed") { + fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + return + } + + const previous = MutableHashMap.get(self.state.backing, key) if (previous._tag === "Some") { - if (previous.value === fiber) { + if (options?.onlyIfMissing === true) { + fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + return + } else if (previous.value === fiber) { return } - previous.value.unsafeInterruptAsFork(interruptAs ?? FiberId.none) + previous.value.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) } ;(fiber as FiberRuntime).setFiberRef(FiberRef.unhandledErrorLogLevel, Option.none()) - MutableHashMap.set(self.backing, key, fiber) + MutableHashMap.set(self.state.backing, key, fiber) fiber.addObserver((exit) => { - const current = MutableHashMap.get(self.backing, key) + if (self.state._tag === "Closed") { + return + } + const current = MutableHashMap.get(self.state.backing, key) if (Option.isSome(current) && fiber === current.value) { - MutableHashMap.remove(self.backing, key) + MutableHashMap.remove(self.state.backing, key) } if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) { Deferred.unsafeDone(self.deferred, exit as any) @@ -199,26 +233,35 @@ export const unsafeSet: { export const set: { ( key: K, - fiber: Fiber.RuntimeFiber + fiber: Fiber.RuntimeFiber, + options?: { + readonly onlyIfMissing?: boolean | undefined + } | undefined ): (self: FiberMap) => Effect.Effect ( self: FiberMap, key: K, - fiber: Fiber.RuntimeFiber + fiber: Fiber.RuntimeFiber, + options?: { + readonly onlyIfMissing?: boolean | undefined + } | undefined ): Effect.Effect -} = dual< - ( - key: K, - fiber: Fiber.RuntimeFiber - ) => (self: FiberMap) => Effect.Effect, - ( - self: FiberMap, - key: K, - fiber: Fiber.RuntimeFiber - ) => Effect.Effect ->(3, (self, key, fiber) => +} = dual((args) => isFiberMap(args[0]), ( + self: FiberMap, + key: K, + fiber: Fiber.RuntimeFiber, + options?: { + readonly onlyIfMissing?: boolean | undefined + } | undefined +): Effect.Effect => Effect.fiberIdWith( - (fiberId) => Effect.sync(() => unsafeSet(self, key, fiber, fiberId)) + (fiberId) => + Effect.sync(() => + unsafeSet(self, key, fiber, { + interruptAs: fiberId, + onlyIfMissing: options?.onlyIfMissing + }) + ) )) /** @@ -238,7 +281,7 @@ export const unsafeGet: { self: FiberMap, key: K ) => Option.Option> ->(2, (self, key) => MutableHashMap.get(self.backing, key)) +>(2, (self, key) => self.state._tag === "Closed" ? Option.none() : MutableHashMap.get(self.state.backing, key)) /** * Retrieve a fiber from the FiberMap. @@ -257,7 +300,36 @@ export const get: { self: FiberMap, key: K ) => Effect.Effect, NoSuchElementException> ->(2, (self, key) => Effect.suspend(() => MutableHashMap.get(self.backing, key))) +>(2, (self, key) => Effect.suspend(() => unsafeGet(self, key))) + +/** + * Check if a key exists in the FiberMap. + * + * @since 2.0.0 + * @categories combinators + */ +export const unsafeHas: { + (key: K): (self: FiberMap) => boolean + (self: FiberMap, key: K): boolean +} = dual( + 2, + (self: FiberMap, key: K): boolean => + self.state._tag === "Closed" ? false : MutableHashMap.has(self.state.backing, key) +) + +/** + * Check if a key exists in the FiberMap. + * + * @since 2.0.0 + * @categories combinators + */ +export const has: { + (key: K): (self: FiberMap) => Effect.Effect + (self: FiberMap, key: K): Effect.Effect +} = dual( + 2, + (self: FiberMap, key: K): Effect.Effect => Effect.sync(() => unsafeHas(self, key)) +) /** * Remove a fiber from the FiberMap, interrupting it if it exists. @@ -278,11 +350,14 @@ export const remove: { ) => Effect.Effect >(2, (self, key) => Effect.suspend(() => { - const fiber = MutableHashMap.get(self.backing, key) + if (self.state._tag === "Closed") { + return Effect.void + } + const fiber = MutableHashMap.get(self.state.backing, key) if (fiber._tag === "None") { return Effect.void } - MutableHashMap.remove(self.backing, key) + // will be removed by the observer return Fiber.interrupt(fiber.value) })) @@ -291,12 +366,15 @@ export const remove: { * @categories combinators */ export const clear = (self: FiberMap): Effect.Effect => - Effect.zipRight( - Effect.forEach(self.backing, ([_, fiber]) => Fiber.interrupt(fiber)), - Effect.sync(() => { - MutableHashMap.clear(self.backing) - }) - ) + Effect.suspend(() => { + if (self.state._tag === "Closed") { + return Effect.void + } + + return Effect.forEach(self.state.backing, ([, fiber]) => + // will be removed by the observer + Fiber.interrupt(fiber)) + }) /** * Run an Effect and add the forked fiber to the FiberMap. @@ -308,32 +386,54 @@ export const clear = (self: FiberMap): Effect.Effect => export const run: { ( self: FiberMap, - key: K + key: K, + options?: { + readonly onlyIfMissing?: boolean | undefined + } | undefined ): ( effect: Effect.Effect ) => Effect.Effect, never, R> ( self: FiberMap, key: K, - effect: Effect.Effect + effect: Effect.Effect, + options?: { + readonly onlyIfMissing?: boolean | undefined + } | undefined ): Effect.Effect, never, R> } = function() { - if (arguments.length === 2) { + if (Effect.isEffect(arguments[2])) { const self = arguments[0] as FiberMap const key = arguments[1] - return (effect: Effect.Effect) => - Effect.tap( - Effect.forkDaemon(effect), - (fiber) => set(self, key, fiber) + const effect = arguments[2] as Effect.Effect + const options = arguments[3] as { readonly onlyIfMissing?: boolean } | undefined + return Effect.suspend(() => { + if (self.state._tag === "Closed") { + return Effect.interrupt + } + return Effect.uninterruptibleMask((restore) => + Effect.tap( + restore(Effect.forkDaemon(effect)), + (fiber) => set(self, key, fiber, options) + ) ) + }) as any } const self = arguments[0] as FiberMap const key = arguments[1] - const effect = arguments[2] as Effect.Effect - return Effect.tap( - Effect.forkDaemon(effect), - (fiber) => set(self, key, fiber) - ) as any + const options = arguments[2] as { readonly onlyIfMissing?: boolean } | undefined + return (effect: Effect.Effect) => + Effect.suspend(() => { + if (self.state._tag === "Closed") { + return Effect.interrupt + } + return Effect.uninterruptibleMask((restore) => + Effect.tap( + restore(Effect.forkDaemon(effect)), + (fiber) => set(self, key, fiber, options) + ) + ) + }) } /** @@ -369,7 +469,11 @@ export const runtime: ( ( key: K, effect: Effect.Effect, - options?: Runtime.RunForkOptions | undefined + options?: + | Runtime.RunForkOptions & { + readonly onlyIfMissing?: boolean | undefined + } + | undefined ) => Fiber.RuntimeFiber, never, R @@ -381,10 +485,14 @@ export const runtime: ( return ( key: K, effect: Effect.Effect, - options?: Runtime.RunForkOptions | undefined + options?: + | Runtime.RunForkOptions & { + readonly onlyIfMissing?: boolean | undefined + } + | undefined ) => { const fiber = runFork(effect, options) - unsafeSet(self, key, fiber) + unsafeSet(self, key, fiber, options) return fiber } } @@ -395,7 +503,7 @@ export const runtime: ( * @categories combinators */ export const size = (self: FiberMap): Effect.Effect => - Effect.sync(() => MutableHashMap.size(self.backing)) + Effect.sync(() => self.state._tag === "Closed" ? 0 : MutableHashMap.size(self.state.backing)) /** * Join all fibers in the FiberMap. If any of the Fiber's in the map terminate with a failure, @@ -414,5 +522,5 @@ export const size = (self: FiberMap): Effect.Effect => * yield* _(FiberMap.join(map)); * }); */ -export const join = (self: FiberMap): Effect.Effect => - Deferred.await(self.deferred as Deferred.Deferred) +export const join = (self: FiberMap): Effect.Effect => + Deferred.await(self.deferred as Deferred.Deferred) diff --git a/packages/effect/src/FiberSet.ts b/packages/effect/src/FiberSet.ts index 1457465cb8..7dca2844fe 100644 --- a/packages/effect/src/FiberSet.ts +++ b/packages/effect/src/FiberSet.ts @@ -2,6 +2,7 @@ * @since 2.0.0 */ import * as Effect from "effect/Effect" +import * as FiberId from "effect/FiberId" import type * as Scope from "effect/Scope" import * as Cause from "./Cause.js" import * as Deferred from "./Deferred.js" @@ -11,6 +12,7 @@ import * as FiberRef from "./FiberRef.js" import { dual } from "./Function.js" import * as Inspectable from "./Inspectable.js" import type { FiberRuntime } from "./internal/fiberRuntime.js" +import * as Iterable from "./Iterable.js" import * as Option from "./Option.js" import { type Pipeable, pipeArguments } from "./Pipeable.js" import * as Predicate from "./Predicate.js" @@ -36,8 +38,14 @@ export interface FiberSet extends Pipeable, Inspectable.Inspectable, Iterable> { readonly [TypeId]: TypeId - readonly backing: Set> - readonly deferred: Deferred.Deferred + readonly deferred: Deferred.Deferred + /** @internal */ + state: { + readonly _tag: "Open" + readonly backing: Set> + } | { + readonly _tag: "Closed" + } } /** @@ -49,7 +57,10 @@ export const isFiberSet = (u: unknown): u is FiberSet => Predi const Proto = { [TypeId]: TypeId, [Symbol.iterator](this: FiberSet) { - return this.backing[Symbol.iterator]() + if (this.state._tag === "Closed") { + return Iterable.empty() + } + return this.state.backing[Symbol.iterator]() }, toString(this: FiberSet) { return Inspectable.format(this.toJSON()) @@ -57,7 +68,7 @@ const Proto = { toJSON(this: FiberSet) { return { _id: "FiberMap", - backing: Inspectable.toJSON(Array.from(this.backing)) + state: this.state } }, [Inspectable.NodeInspectSymbol](this: FiberSet) { @@ -70,10 +81,10 @@ const Proto = { const unsafeMake = ( backing: Set>, - deferred: Deferred.Deferred + deferred: Deferred.Deferred ): FiberSet => { const self = Object.create(Proto) - self.backing = backing + self.state = { _tag: "Open", backing } self.deferred = deferred return self } @@ -105,8 +116,15 @@ const unsafeMake = ( */ export const make = (): Effect.Effect, never, Scope.Scope> => Effect.acquireRelease( - Effect.map(Deferred.make(), (deferred) => unsafeMake(new Set(), deferred)), - clear + Effect.map(Deferred.make(), (deferred) => unsafeMake(new Set(), deferred)), + (set) => + Effect.zipRight( + clear(set), + Effect.suspend(() => { + set.state = { _tag: "Closed" } + return Deferred.done(set.deferred, Exit.void) + }) + ) ) /** @@ -135,24 +153,39 @@ export const makeRuntime = (): Effect.Effec * @categories combinators */ export const unsafeAdd: { - (fiber: Fiber.RuntimeFiber): (self: FiberSet) => void - (self: FiberSet, fiber: Fiber.RuntimeFiber): void -} = dual< ( - fiber: Fiber.RuntimeFiber - ) => (self: FiberSet) => void, + fiber: Fiber.RuntimeFiber, + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + } | undefined + ): (self: FiberSet) => void ( self: FiberSet, - fiber: Fiber.RuntimeFiber - ) => void ->(2, (self, fiber) => { - if (self.backing.has(fiber)) { + fiber: Fiber.RuntimeFiber, + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + } | undefined + ): void +} = dual((args) => isFiberSet(args[0]), ( + self: FiberSet, + fiber: Fiber.RuntimeFiber, + options?: { + readonly interruptAs?: FiberId.FiberId | undefined + } | undefined +): void => { + if (self.state._tag === "Closed") { + fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + return + } else if (self.state.backing.has(fiber)) { return } ;(fiber as FiberRuntime).setFiberRef(FiberRef.unhandledErrorLogLevel, Option.none()) - self.backing.add(fiber) + self.state.backing.add(fiber) fiber.addObserver((exit) => { - self.backing.delete(fiber) + if (self.state._tag === "Closed") { + return + } + self.state.backing.delete(fiber) if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) { Deferred.unsafeDone(self.deferred, exit as any) } @@ -181,19 +214,31 @@ export const add: { self: FiberSet, fiber: Fiber.RuntimeFiber ) => Effect.Effect ->(2, (self, fiber) => Effect.sync(() => unsafeAdd(self, fiber))) +>( + 2, + (self, fiber) => + Effect.fiberIdWith((fiberId) => + Effect.sync(() => + unsafeAdd(self, fiber, { + interruptAs: fiberId + }) + ) + ) +) /** * @since 2.0.0 * @categories combinators */ export const clear = (self: FiberSet): Effect.Effect => - Effect.zipRight( - Effect.forEach(self.backing, (fiber) => Fiber.interrupt(fiber)), - Effect.sync(() => { - self.backing.clear() - }) - ) + Effect.suspend(() => { + if (self.state._tag === "Closed") { + return Effect.void + } + return Effect.forEach(self.state.backing, (fiber) => + // will be removed by the observer + Fiber.interrupt(fiber)) + }) /** * Fork an Effect and add the forked fiber to the FiberSet. @@ -214,16 +259,30 @@ export const run: { const self = arguments[0] as FiberSet if (arguments.length === 1) { return (effect: Effect.Effect) => + Effect.suspend(() => { + if (self.state._tag === "Closed") { + return Effect.interrupt + } + return Effect.uninterruptibleMask((restore) => + Effect.tap( + restore(Effect.forkDaemon(effect)), + (fiber) => add(self, fiber) + ) + ) + }) + } + const effect = arguments[1] as Effect.Effect + return Effect.suspend(() => { + if (self.state._tag === "Closed") { + return Effect.interrupt + } + return Effect.uninterruptibleMask((restore) => Effect.tap( - Effect.forkDaemon(effect), + restore(Effect.forkDaemon(effect)), (fiber) => add(self, fiber) ) - } - const effect = arguments[1] as Effect.Effect - return Effect.tap( - Effect.forkDaemon(effect), - (fiber) => add(self, fiber) - ) as any + ) + }) as any } /** @@ -281,7 +340,8 @@ export const runtime: ( * @since 2.0.0 * @categories combinators */ -export const size = (self: FiberSet): Effect.Effect => Effect.sync(() => self.backing.size) +export const size = (self: FiberSet): Effect.Effect => + Effect.sync(() => self.state._tag === "Closed" ? 0 : self.state.backing.size) /** * Join all fibers in the FiberSet. If any of the Fiber's in the set terminate with a failure, @@ -300,5 +360,5 @@ export const size = (self: FiberSet): Effect.Effect => Effec * yield* _(FiberSet.join(set)); * }); */ -export const join = (self: FiberSet): Effect.Effect => - Deferred.await(self.deferred as Deferred.Deferred) +export const join = (self: FiberSet): Effect.Effect => + Deferred.await(self.deferred as Deferred.Deferred) diff --git a/packages/effect/src/index.ts b/packages/effect/src/index.ts index 300c80e62d..9aa5f2c832 100644 --- a/packages/effect/src/index.ts +++ b/packages/effect/src/index.ts @@ -255,6 +255,11 @@ export * as Exit from "./Exit.js" */ export * as Fiber from "./Fiber.js" +/** + * @since 2.0.0 + */ +export * as FiberHandle from "./FiberHandle.js" + /** * @since 2.0.0 */ diff --git a/packages/effect/test/FiberHandle.test.ts b/packages/effect/test/FiberHandle.test.ts new file mode 100644 index 0000000000..21b2f0c432 --- /dev/null +++ b/packages/effect/test/FiberHandle.test.ts @@ -0,0 +1,53 @@ +import { Effect, Ref } from "effect" +import * as it from "effect-test/utils/extend" +import * as FiberHandle from "effect/FiberHandle" +import { assert, describe } from "vitest" + +describe("FiberHandle", () => { + it.effect("interrupts fibers", () => + Effect.gen(function*(_) { + const ref = yield* _(Ref.make(0)) + yield* _( + Effect.gen(function*(_) { + const handle = yield* _(FiberHandle.make()) + yield* _(FiberHandle.run(handle, Effect.onInterrupt(Effect.never, () => Ref.update(ref, (n) => n + 1)))) + yield* _(Effect.yieldNow()) + }), + Effect.scoped + ) + + assert.strictEqual(yield* _(Ref.get(ref)), 1) + })) + + it.effect("runtime", () => + Effect.gen(function*(_) { + const ref = yield* _(Ref.make(0)) + yield* _( + Effect.gen(function*(_) { + const handle = yield* _(FiberHandle.make()) + const run = yield* _(FiberHandle.runtime(handle)()) + run(Effect.onInterrupt(Effect.never, () => Ref.update(ref, (n) => n + 1))) + yield* _(Effect.yieldNow()) + run(Effect.onInterrupt(Effect.never, () => Ref.update(ref, (n) => n + 1))) + yield* _(Effect.yieldNow()) + run(Effect.onInterrupt(Effect.never, () => Ref.update(ref, (n) => n + 1)), { + onlyIfMissing: true + }) + yield* _(Effect.yieldNow()) + assert.strictEqual(yield* _(Ref.get(ref)), 2) + }), + Effect.scoped + ) + + assert.strictEqual(yield* _(Ref.get(ref)), 3) + })) + + it.scoped("join", () => + Effect.gen(function*(_) { + const handle = yield* _(FiberHandle.make()) + FiberHandle.unsafeSet(handle, Effect.runFork(Effect.void)) + FiberHandle.unsafeSet(handle, Effect.runFork(Effect.fail("fail"))) + const result = yield* _(FiberHandle.join(handle), Effect.flip) + assert.strictEqual(result, "fail") + })) +}) diff --git a/packages/effect/test/FiberMap.test.ts b/packages/effect/test/FiberMap.test.ts index 508dfaa8b9..fd5cb3d4d4 100644 --- a/packages/effect/test/FiberMap.test.ts +++ b/packages/effect/test/FiberMap.test.ts @@ -1,4 +1,4 @@ -import { Effect, ReadonlyArray, Ref } from "effect" +import { Effect, Exit, ReadonlyArray, Ref, Scope } from "effect" import * as it from "effect-test/utils/extend" import * as FiberMap from "effect/FiberMap" import { assert, describe } from "vitest" @@ -61,4 +61,15 @@ describe("FiberMap", () => { const result = yield* _(FiberMap.join(map), Effect.flip) assert.strictEqual(result, "fail") })) + + it.effect("size", () => + Effect.gen(function*(_) { + const scope = yield* _(Scope.make()) + const set = yield* _(FiberMap.make(), Scope.extend(scope)) + FiberMap.unsafeSet(set, "a", Effect.runFork(Effect.never)) + FiberMap.unsafeSet(set, "b", Effect.runFork(Effect.never)) + assert.strictEqual(yield* _(FiberMap.size(set)), 2) + yield* _(Scope.close(scope, Exit.void)) + assert.strictEqual(yield* _(FiberMap.size(set)), 0) + })) }) diff --git a/packages/effect/test/FiberSet.test.ts b/packages/effect/test/FiberSet.test.ts index 9f92e32a2e..d6ee1545e0 100644 --- a/packages/effect/test/FiberSet.test.ts +++ b/packages/effect/test/FiberSet.test.ts @@ -1,4 +1,4 @@ -import { Effect, ReadonlyArray, Ref } from "effect" +import { Effect, Exit, ReadonlyArray, Ref, Scope } from "effect" import * as it from "effect-test/utils/extend" import * as FiberSet from "effect/FiberSet" import { assert, describe } from "vitest" @@ -59,4 +59,15 @@ describe("FiberSet", () => { const result = yield* _(FiberSet.join(set), Effect.flip) assert.strictEqual(result, "fail") })) + + it.effect("size", () => + Effect.gen(function*(_) { + const scope = yield* _(Scope.make()) + const set = yield* _(FiberSet.make(), Scope.extend(scope)) + FiberSet.unsafeAdd(set, Effect.runFork(Effect.never)) + FiberSet.unsafeAdd(set, Effect.runFork(Effect.never)) + assert.strictEqual(yield* _(FiberSet.size(set)), 2) + yield* _(Scope.close(scope, Exit.void)) + assert.strictEqual(yield* _(FiberSet.size(set)), 0) + })) }) diff --git a/packages/experimental/src/Machine.ts b/packages/experimental/src/Machine.ts index 1fcb7051fb..f09dad13bc 100644 --- a/packages/experimental/src/Machine.ts +++ b/packages/experimental/src/Machine.ts @@ -15,7 +15,6 @@ import * as FiberRefs from "effect/FiberRefs" import * as FiberSet from "effect/FiberSet" import { dual, identity, pipe } from "effect/Function" import { globalValue } from "effect/GlobalValue" -import * as MutableHashMap from "effect/MutableHashMap" import * as Option from "effect/Option" import type { Pipeable } from "effect/Pipeable" import { pipeArguments } from "effect/Pipeable" @@ -645,12 +644,8 @@ export const boot = < (id: string): (effect: Effect.Effect) => Effect.Effect (effect: Effect.Effect, id: string): Effect.Effect } = dual(2, (effect: Effect.Effect, id: string): Effect.Effect => - Effect.suspend(() => { - if (MutableHashMap.has(fiberMap.backing, id)) { - return Effect.void - } - return forkReplace(effect, id) - })) + Effect.asVoid(FiberMap.run(fiberMap, id, MachineDefect.wrap(effect), { onlyIfMissing: true }))) + const forkOneWith: { ( id: string,