Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/afraid-taxis-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

Add Layer.setRandom, for over-riding the default Random service
5 changes: 5 additions & 0 deletions .changeset/fast-garlics-burn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

Add Record.findFirst
18 changes: 18 additions & 0 deletions .changeset/funny-islands-relate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"effect": minor
---

add Stream.toAsyncIterable\* apis

```ts
import { Stream } from "effect"

// Will print:
// 1
// 2
// 3
const stream = Stream.make(1, 2, 3)
for await (const result of Stream.toAsyncIterable(stream)) {
console.log(result)
}
```
5 changes: 5 additions & 0 deletions .changeset/odd-crabs-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

Added Stream.filterMapEffectOption combinator for effectful filtering and mapping in a single step
5 changes: 5 additions & 0 deletions .changeset/wild-melons-scream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": minor
---

Allow removing multiple Headers
11 changes: 11 additions & 0 deletions packages/effect/dtslint/Record.tst.ts
Original file line number Diff line number Diff line change
Expand Up @@ -589,4 +589,15 @@ describe("Record", () => {
expect(Record.intersection(string$structAB, { b: 2 }, (a, _) => a))
.type.toBe<Record<"b", number>>()
})

it("findFirst", () => {
expect(Record.findFirst(string$numbersOrStrings, (a, _) => predicateNumbersOrStrings(a)))
.type.toBe<Option.Option<[string, string | number]>>()
expect(pipe(string$numbersOrStrings, Record.findFirst((a, _) => predicateNumbersOrStrings(a))))
.type.toBe<Option.Option<[string, string | number]>>()
expect(Record.findFirst(string$numbersOrStrings, Predicate.isString))
.type.toBe<Option.Option<[string, string]>>()
expect(pipe(string$numbersOrStrings, Record.findFirst(Predicate.isString)))
.type.toBe<Option.Option<[string, string]>>()
})
})
11 changes: 11 additions & 0 deletions packages/effect/src/Layer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import * as fiberRuntime from "./internal/fiberRuntime.js"
import * as internal from "./internal/layer.js"
import * as circularLayer from "./internal/layer/circular.js"
import * as query from "./internal/query.js"
import { randomTag } from "./internal/random.js"
import type { LogLevel } from "./LogLevel.js"
import type * as Option from "./Option.js"
import type { Pipeable } from "./Pipeable.js"
import type * as Random from "./Random.js"
import type * as Request from "./Request.js"
import type * as Runtime from "./Runtime.js"
import type * as Schedule from "./Schedule.js"
Expand Down Expand Up @@ -945,6 +947,15 @@ export const setConfigProvider: (configProvider: ConfigProvider) => Layer<never>
*/
export const parentSpan: (span: Tracer.AnySpan) => Layer<Tracer.ParentSpan> = circularLayer.parentSpan

/**
* @since 3.15.0
* @category Random
*/
export const setRandom = <A extends Random.Random>(random: A): Layer<never> =>
scopedDiscard(
fiberRuntime.fiberRefLocallyScopedWith(defaultServices.currentServices, Context.add(randomTag, random))
)

/**
* @since 2.0.0
* @category requests & batching
Expand Down
44 changes: 44 additions & 0 deletions packages/effect/src/Record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1227,3 +1227,47 @@ export const getEquivalence = <K extends string, A>(
export const singleton = <K extends string | symbol, A>(key: K, value: A): Record<K, A> => ({
[key]: value
} as any)

/**
* Returns the first entry that satisfies the specified
* predicate, or `None` if no such entry exists.
*
* @example
* ```ts
* import { Record, Option } from "effect"
*
* const record = { a: 1, b: 2, c: 3 }
* const result = Record.findFirst(record, (value, key) => value > 1 && key !== "b")
* console.log(result) // Option.Some(["c", 3])
* ```
*
* @category elements
* @since 3.14.0
*/
export const findFirst: {
<K extends string | symbol, V, V2 extends V>(
refinement: (value: NoInfer<V>, key: NoInfer<K>) => value is V2
): (self: Record<K, V>) => Option.Option<[K, V2]>
<K extends string | symbol, V>(
predicate: (value: NoInfer<V>, key: NoInfer<K>) => boolean
): (self: Record<K, V>) => Option.Option<[K, V]>
<K extends string | symbol, V, V2 extends V>(
self: Record<K, V>,
refinement: (value: NoInfer<V>, key: NoInfer<K>) => value is V2
): Option.Option<[K, V2]>
<K extends string | symbol, V>(
self: Record<K, V>,
predicate: (value: NoInfer<V>, key: NoInfer<K>) => boolean
): Option.Option<[K, V]>
} = dual(
2,
<K extends string | symbol, V>(self: Record<K, V>, f: (value: V, key: K) => boolean) => {
for (const a of Object.entries<V>(self)) {
const o = f(a[1], a[0] as K)
if (o) {
return Option.some(a)
}
}
return Option.none()
}
)
46 changes: 46 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,24 @@ export const filterMapEffect: {
): Stream<A2, E | E2, R | R2>
} = internal.filterMapEffect

/**
* Performs an effectful filter and map in a single step. Effects that
* succeed with `Some` will yield the value in the resulting stream, effects that
* succeed with `None` will be filtered out.
*
* @since 2.0.0
* @category mapping
*/
export const filterMapEffectOption: {
<A, A2, E2, R2>(
f: (a: A) => Effect.Effect<Option.Option<A2>, E2, R2>
): <E, R>(self: Stream<A, E, R>) => Stream<A2, E | E2, R | R2>
<A, E, R, A2, E2, R2>(
self: Stream<A, E, R>,
f: (a: A) => Effect.Effect<Option.Option<A2>, E2, R2>
): Stream<A2, E | E2, R | R2>
} = internal.filterMapEffectOption

/**
* Transforms all elements of the stream for as long as the specified partial
* function is defined.
Expand Down Expand Up @@ -5332,6 +5350,34 @@ export const toReadableStreamRuntime: {
): ReadableStream<A>
} = internal.toReadableStreamRuntime

/**
* Converts the stream to a `AsyncIterable` using the provided runtime.
*
* @since 3.15.0
* @category destructors
*/
export const toAsyncIterableRuntime: {
<A, XR>(runtime: Runtime<XR>): <E, R extends XR>(self: Stream<A, E, R>) => AsyncIterable<A>
<A, E, XR, R extends XR>(self: Stream<A, E, R>, runtime: Runtime<XR>): AsyncIterable<A>
} = internal.toAsyncIterableRuntime

/**
* Converts the stream to a `AsyncIterable` capturing the required dependencies.
*
* @since 3.15.0
* @category destructors
*/
export const toAsyncIterableEffect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<AsyncIterable<A>, never, R> =
internal.toAsyncIterableEffect

/**
* Converts the stream to a `AsyncIterable`.
*
* @since 3.15.0
* @category destructors
*/
export const toAsyncIterable: <A, E>(self: Stream<A, E>) => AsyncIterable<A> = internal.toAsyncIterable

/**
* Applies the transducer to the stream and emits its outputs.
*
Expand Down
87 changes: 87 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2597,6 +2597,25 @@ export const filterMapEffect = dual<
})
)

/** @internal */
export const filterMapEffectOption = dual<
<A, A2, E2, R2>(
f: (a: A) => Effect.Effect<Option.Option<A2>, E2, R2>
) => <E, R>(self: Stream.Stream<A, E, R>) => Stream.Stream<A2, E | E2, R | R2>,
<A, E, R, A2, E2, R2>(
self: Stream.Stream<A, E, R>,
f: (a: A) => Effect.Effect<Option.Option<A2>, E2, R2>
) => Stream.Stream<A2, E | E2, R | R2>
>(2, <A, E, R, A2, E2, R2>(
self: Stream.Stream<A, E, R>,
f: (a: A) => Effect.Effect<Option.Option<A2>, E2, R2>
): Stream.Stream<A2, E | E2, R | R2> =>
pipe(
self,
mapEffectSequential(f),
filterMap(identity)
))

/** @internal */
export const filterMapWhile = dual<
<A, A2>(
Expand Down Expand Up @@ -7200,6 +7219,74 @@ export const transduce = dual<
}
)

/** @internal */
export const toAsyncIterableRuntime = dual<
<A, XR>(
runtime: Runtime.Runtime<XR>
) => <E, R extends XR>(self: Stream.Stream<A, E, R>) => AsyncIterable<A>,
<A, E, XR, R extends XR>(
self: Stream.Stream<A, E, R>,
runtime: Runtime.Runtime<XR>
) => AsyncIterable<A>
>(
(args) => isStream(args[0]),
<A, E, XR, R extends XR>(
self: Stream.Stream<A, E, R>,
runtime: Runtime.Runtime<XR>
): AsyncIterable<A> => {
const runFork = Runtime.runFork(runtime)
return {
[Symbol.asyncIterator]() {
let currentResolve: ((value: IteratorResult<A>) => void) | undefined = undefined
let currentReject: ((reason: any) => void) | undefined = undefined
let fiber: Fiber.RuntimeFiber<void, E> | undefined = undefined
const latch = Effect.unsafeMakeLatch(false)
return {
next() {
if (!fiber) {
fiber = runFork(runForEach(self, (value) =>
latch.whenOpen(Effect.sync(() => {
latch.unsafeClose()
currentResolve!({ done: false, value })
currentResolve = currentReject = undefined
}))))
fiber.addObserver((exit) => {
fiber = Effect.runFork(latch.whenOpen(Effect.sync(() => {
if (exit._tag === "Failure") {
currentReject!(Cause.squash(exit.cause))
} else {
currentResolve!({ done: true, value: void 0 })
}
currentResolve = currentReject = undefined
})))
})
}
return new Promise<IteratorResult<A>>((resolve, reject) => {
currentResolve = resolve
currentReject = reject
latch.unsafeOpen()
})
},
return() {
if (!fiber) return Promise.resolve({ done: true, value: void 0 })
return Effect.runPromise(Effect.as(Fiber.interrupt(fiber), { done: true, value: void 0 }))
}
}
}
}
}
)

/** @internal */
export const toAsyncIterable = <A, E>(self: Stream.Stream<A, E>): AsyncIterable<A> =>
toAsyncIterableRuntime(self, Runtime.defaultRuntime)

/** @internal */
export const toAsyncIterableEffect = <A, E, R>(
self: Stream.Stream<A, E, R>
): Effect.Effect<AsyncIterable<A>, never, R> =>
Effect.map(Effect.runtime<R>(), (runtime) => toAsyncIterableRuntime(self, runtime))

/** @internal */
export const unfold = <S, A>(s: S, f: (s: S) => Option.Option<readonly [A, S]>): Stream.Stream<A> =>
unfoldChunk(s, (s) => pipe(f(s), Option.map(([a, s]) => [Chunk.of(a), s])))
Expand Down
26 changes: 26 additions & 0 deletions packages/effect/test/Record.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,5 +373,31 @@ describe("Record", () => {
it("mapEntries", () => {
deepStrictEqual(pipe(stringRecord, Record.mapEntries((a, key) => [key.toUpperCase(), a + 1])), { A: 2 })
})

describe("findFirst", () => {
it("refinement/predicate", () => {
const record = {
a: 1,
b: 2,
c: 1
}
deepStrictEqual(
pipe(record, Record.findFirst((v) => v < 2)),
Option.some(["a", 1])
)
deepStrictEqual(
pipe(record, Record.findFirst((v, k) => v < 2 && k !== "a")),
Option.some(["c", 1])
)
deepStrictEqual(
pipe(record, Record.findFirst((v) => v > 2)),
Option.none()
)
deepStrictEqual(
Record.findFirst(record, (v) => v < 2),
Option.some(["a", 1])
)
})
})
})
})
9 changes: 9 additions & 0 deletions packages/effect/test/Stream/conversions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,13 @@ describe("Stream", () => {
)
deepStrictEqual(queue, Exit.die(new Cause.RuntimeException("die")))
}))

it("toAsyncIterable", async () => {
const stream = Stream.make(1, 2, 3)
const results: Array<number> = []
for await (const result of Stream.toAsyncIterable(stream)) {
results.push(result)
}
deepStrictEqual(results, [1, 2, 3])
})
})
Loading