Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/spicy-zoos-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@effect/cluster": patch
"@effect/rpc": patch
---

add disableFatalDefects option to cluster entities
5 changes: 5 additions & 0 deletions .changeset/tricky-stars-worry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/workflow": patch
---

add Workflow.CaptureDefects annotation, to configure defect behaviour
11 changes: 11 additions & 0 deletions packages/cluster/src/Entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import * as Layer from "effect/Layer"
import * as Mailbox from "effect/Mailbox"
import * as Option from "effect/Option"
import * as Predicate from "effect/Predicate"
import type * as Schedule from "effect/Schedule"
import { Scope } from "effect/Scope"
import type * as Stream from "effect/Stream"
import type {
Expand Down Expand Up @@ -127,6 +128,8 @@ export interface Entity<in out Rpcs extends Rpc.Any> extends Equal.Equal {
readonly maxIdleTime?: DurationInput | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly mailboxCapacity?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
}
): Layer.Layer<
never,
Expand Down Expand Up @@ -163,6 +166,8 @@ export interface Entity<in out Rpcs extends Rpc.Any> extends Equal.Equal {
options?: {
readonly maxIdleTime?: DurationInput | undefined
readonly mailboxCapacity?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
}
): Layer.Layer<
never,
Expand Down Expand Up @@ -234,6 +239,9 @@ const Proto = {
options?: {
readonly maxIdleTime?: DurationInput | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly mailboxCapacity?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
}
): Layer.Layer<
never,
Expand Down Expand Up @@ -276,6 +284,9 @@ const Proto = {
>,
options?: {
readonly maxIdleTime?: DurationInput | undefined
readonly mailboxCapacity?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
}
) {
const buildHandlers = Effect.gen(this, function*() {
Expand Down
2 changes: 2 additions & 0 deletions packages/cluster/src/Sharding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ export class Sharding extends Context.Tag("@effect/cluster/Sharding")<Sharding,
readonly maxIdleTime?: DurationInput | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly mailboxCapacity?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown> | undefined
}
) => Effect.Effect<
void,
Expand Down
39 changes: 26 additions & 13 deletions packages/cluster/src/internal/entityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ export const make = Effect.fnUntraced(function*<
readonly maxIdleTime?: DurationInput | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly mailboxCapacity?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly defectRetryPolicy?: Schedule.Schedule<any, unknown, never> | undefined
}
) {
const config = yield* ShardingConfig
Expand All @@ -97,6 +99,9 @@ export const make = Effect.fnUntraced(function*<
const mailboxCapacity = options.mailboxCapacity ?? config.entityMailboxCapacity
const clock = yield* Effect.clock
const context = yield* Effect.context<Rpc.Context<Rpcs> | Rpc.Middleware<Rpcs> | RX>()
const retryDriver = yield* Schedule.driver(
options.defectRetryPolicy ? Schedule.andThen(options.defectRetryPolicy, defaultRetryPolicy) : defaultRetryPolicy
)

const activeServers = new Map<EntityId, EntityState>()

Expand Down Expand Up @@ -141,6 +146,7 @@ export const make = Effect.fnUntraced(function*<
const server = yield* RpcServer.makeNoSerialization(entity.protocol, {
spanPrefix: `${entity.type}(${address.entityId})`,
concurrency: options.concurrency ?? 1,
disableFatalDefects: options.disableFatalDefects,
onFromServer(response): Effect.Effect<void> {
switch (response._tag) {
case "Exit": {
Expand Down Expand Up @@ -210,19 +216,7 @@ export const make = Effect.fnUntraced(function*<
))
}
case "Defect": {
const effect = writeRef.unsafeRebuild()
defectRequestIds = Array.from(activeRequests.keys())
return Effect.logError("Defect in entity, restarting", Cause.die(response.defect)).pipe(
Effect.andThen(effect.pipe(
Effect.tapErrorCause(Effect.logError),
Effect.retry(Schedule.spaced(500))
)),
Effect.annotateLogs({
module: "EntityManager",
address,
runner: options.runnerAddress
})
)
return Effect.forkIn(onDefect(Cause.die(response.defect)), managerScope)
}
case "ClientEnd": {
return endLatch.open
Expand Down Expand Up @@ -259,6 +253,21 @@ export const make = Effect.fnUntraced(function*<
})
)

function onDefect(cause: Cause.Cause<never>): Effect.Effect<void> {
const effect = writeRef.unsafeRebuild()
defectRequestIds = Array.from(activeRequests.keys())
return Effect.logError("Defect in entity, restarting", cause).pipe(
Effect.andThen(Effect.ignore(retryDriver.next(void 0))),
Effect.andThen(effect),
Effect.annotateLogs({
module: "EntityManager",
address,
runner: options.runnerAddress
}),
Effect.catchAllCause(onDefect)
)
}

const state: EntityState = {
address,
mailboxGauge: ClusterMetrics.mailboxSize.pipe(
Expand Down Expand Up @@ -482,6 +491,10 @@ export const make = Effect.fnUntraced(function*<
})
})

const defaultRetryPolicy = Schedule.exponential(500, 1.5).pipe(
Schedule.union(Schedule.spaced("10 seconds"))
)

const makeMessageSchema = <Rpcs extends Rpc.Any>(entity: Entity<Rpcs>): Schema.Schema<
{
readonly _tag: "IncomingRequest"
Expand Down
28 changes: 27 additions & 1 deletion packages/cluster/test/Sharding.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@ import {
Snowflake
} from "@effect/cluster"
import { assert, describe, expect, it } from "@effect/vitest"
import { Array, Cause, Chunk, Effect, Exit, Fiber, FiberId, Layer, Mailbox, Option, Stream, TestClock } from "effect"
import {
Array,
Cause,
Chunk,
Effect,
Exit,
Fiber,
FiberId,
Layer,
Mailbox,
MutableRef,
Option,
Stream,
TestClock
} from "effect"
import { TestEntity, TestEntityNoState, TestEntityState, User } from "./TestEntity.js"

describe.concurrent("Sharding", () => {
Expand Down Expand Up @@ -496,6 +510,18 @@ describe.concurrent("Sharding", () => {
)
expect(error._tag).toEqual("EntityNotManagedByRunner")
}).pipe(Effect.provide(TestShardingWithoutEntities)))

it.scoped("restart on defect", () =>
Effect.gen(function*() {
yield* TestClock.adjust(1)
const state = yield* TestEntityState
const makeClient = yield* TestEntity.client
const client = makeClient("1")
MutableRef.set(state.defectTrigger, true)
const result = yield* client.GetUser({ id: 123 })
expect(result).toEqual(new User({ id: 123, name: "User 123" }))
expect(state.layerBuilds.current).toEqual(2)
}).pipe(Effect.provide(TestSharding)))
})

const TestShardingConfig = ShardingConfig.layer({
Expand Down
101 changes: 57 additions & 44 deletions packages/cluster/test/TestEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { Envelope } from "@effect/cluster"
import { ClusterSchema, Entity } from "@effect/cluster"
import type { RpcGroup } from "@effect/rpc"
import { Rpc, RpcSchema } from "@effect/rpc"
import { Effect, Layer, Mailbox, Option, PrimaryKey, Schema, Stream } from "effect"
import { Effect, Layer, Mailbox, MutableRef, Option, PrimaryKey, Schedule, Schema, Stream } from "effect"

export class User extends Schema.Class<User>("User")({
id: Schema.Number,
Expand Down Expand Up @@ -58,61 +58,74 @@ export class TestEntityState extends Effect.Service<TestEntityState>()("TestEnti
RpcGroup.Rpcs<typeof TestEntity.protocol> extends infer R ? R extends Rpc.Any ? Envelope.Request<R> : never
: never
>()
const defectTrigger = MutableRef.make(false)
const layerBuilds = MutableRef.make(0)

return {
messages,
streamMessages,
envelopes,
interrupts
interrupts,
defectTrigger,
layerBuilds
} as const
})
}) {}

export const TestEntityNoState = TestEntity.toLayer(Effect.gen(function*() {
const state = yield* TestEntityState
export const TestEntityNoState = TestEntity.toLayer(
Effect.gen(function*() {
const state = yield* TestEntityState

const never = (envelope: any) =>
Effect.suspend(() => {
state.envelopes.unsafeOffer(envelope)
return Effect.never
}).pipe(Effect.onInterrupt(() => {
state.interrupts.unsafeOffer(envelope)
return Effect.void
}))
return {
GetUser: (envelope) =>
Effect.sync(() => {
MutableRef.update(state.layerBuilds, (count) => count + 1)

const never = (envelope: any) =>
Effect.suspend(() => {
state.envelopes.unsafeOffer(envelope)
return Effect.never
}).pipe(Effect.onInterrupt(() => {
state.interrupts.unsafeOffer(envelope)
return Effect.void
}))
return {
GetUser: (envelope) =>
Effect.sync(() => {
state.envelopes.unsafeOffer(envelope)
if (state.defectTrigger.current) {
MutableRef.set(state.defectTrigger, false)
throw new Error("User not found")
}
return new User({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })
}),
GetUserVolatile: (envelope) =>
Effect.sync(() => {
state.envelopes.unsafeOffer(envelope)
return new User({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })
}),
Never: never,
NeverFork: (envelope) => Rpc.fork(never(envelope)),
NeverVolatile: never,
RequestWithKey: (envelope) => {
state.envelopes.unsafeOffer(envelope)
return new User({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })
}),
GetUserVolatile: (envelope) =>
Effect.sync(() => {
return Effect.orDie(state.messages.take)
},
StreamWithKey: (envelope) => {
let sequence = envelope.lastSentChunkValue.pipe(
Option.map((value) => value + 1),
Option.getOrElse(() => 0)
)
return Mailbox.toStream(state.streamMessages).pipe(
Stream.map(() => sequence++)
)
},
GetAllUsers: (envelope) => {
state.envelopes.unsafeOffer(envelope)
return new User({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })
}),
Never: never,
NeverFork: (envelope) => Rpc.fork(never(envelope)),
NeverVolatile: never,
RequestWithKey: (envelope) => {
state.envelopes.unsafeOffer(envelope)
return Effect.orDie(state.messages.take)
},
StreamWithKey: (envelope) => {
let sequence = envelope.lastSentChunkValue.pipe(
Option.map((value) => value + 1),
Option.getOrElse(() => 0)
)
return Mailbox.toStream(state.streamMessages).pipe(
Stream.map(() => sequence++)
)
},
GetAllUsers: (envelope) => {
state.envelopes.unsafeOffer(envelope)
return Stream.fromIterable(envelope.payload.ids.map((id) => new User({ id, name: `User ${id}` }))).pipe(
Stream.rechunk(1)
)
return Stream.fromIterable(envelope.payload.ids.map((id) => new User({ id, name: `User ${id}` }))).pipe(
Stream.rechunk(1)
)
}
}
}
}))
}),
{ defectRetryPolicy: Schedule.forever }
)

export const TestEntityLayer = TestEntityNoState.pipe(Layer.provideMerge(TestEntityState.Default))
6 changes: 6 additions & 0 deletions packages/rpc/src/RpcServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
readonly spanPrefix?: string | undefined
readonly disableClientAcks?: boolean | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
}
) => Effect.Effect<
RpcServer<Rpcs>,
Expand All @@ -89,13 +90,15 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
readonly spanPrefix?: string | undefined
readonly disableClientAcks?: boolean | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
}
) {
const enableTracing = options.disableTracing !== true
const enableSpanPropagation = options.disableSpanPropagation !== true
const supportsAck = options.disableClientAcks !== true
const spanPrefix = options.spanPrefix ?? "RpcServer"
const concurrency = options.concurrency ?? "unbounded"
const disableFatalDefects = options.disableFatalDefects ?? false
const context = yield* Effect.context<Rpc.ToHandler<Rpcs> | Scope.Scope>()
const scope = Context.get(context, Scope.Scope)
const fiberSet = yield* FiberSet.make()
Expand Down Expand Up @@ -261,6 +264,9 @@ export const makeNoSerialization: <Rpcs extends Rpc.Any>(
},
onFailure: (cause) => {
responded = true
if (!disableFatalDefects && Cause.isDie(cause)) {
return sendDefect(client, Cause.squash(cause))
}
return options.onFromServer({
_tag: "Exit",
clientId: client.id,
Expand Down
Loading