Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/inngest-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ff-effect": patch
---

Add Inngest integration (`ff-effect/for/inngest`)
15 changes: 15 additions & 0 deletions .mcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"mcpServers": {
"lsp-typescript": {
"command": "mcp-language-server",
"args": [
"--workspace",
".",
"--lsp",
"typescript-language-server",
"--",
"--stdio"
]
}
}
}
639 changes: 636 additions & 3 deletions bun.lock

Large diffs are not rendered by default.

255 changes: 255 additions & 0 deletions packages/effect/docs/for/inngest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
# Inngest

Effect wrapper for the [Inngest TypeScript SDK](https://www.inngest.com/docs). Provides type-safe, Effect-native usage of Inngest functions with step memoization, event schemas, and an HTTP handler.

## Quick Start

```ts
import { Effect } from 'effect'
import { Inngest as InngestSdk, EventSchemas } from 'inngest'
import { createInngest } from 'ff-effect/for/inngest'

const Inngest = createInngest(
Effect.succeed(
new InngestSdk({
id: 'my-app',
schemas: new EventSchemas().fromZod({
'user.signup': { data: z.object({ email: z.string() }) },
}),
})
)
)

const program = Effect.gen(function* () {
const fn = yield* Inngest.createFunction(
{ id: 'on-signup' },
{ event: 'user.signup' },
({ event, step }) => Effect.gen(function* () {
yield* step.run('send-email', () =>
Effect.tryPromise(() => sendEmail(event.data.email))
)
})
)

const handler = yield* Inngest.fetchHandler({ functions: [fn] })
Bun.serve({ fetch: handler })
})

await Effect.runPromise(program.pipe(Effect.provide(Inngest.layer)))
```

## `createInngest`

Creates an Effect-based wrapper around an Inngest client. Accepts an `Effect` that produces the client.

```ts
const Inngest = createInngest(Effect.succeed(client))
const Inngest = createInngest(Effect.succeed(client), { tagId: 'MyInngest' })
```

Returns:
- `Tag` — `Context.Tag` for the Inngest client
- `layer` — `Layer.effect(Tag, createClient)` for providing via context
- `send(payload)` — send events (see [Sending Events](#sending-events))
- `createFunction(config, trigger, handler)` — create functions (see [Creating Functions](#creating-functions))
- `httpHandler(opts)` — Effect that resolves to `HttpApp.Default` (see [HTTP Handler](#http-handler))
- `fetchHandler(opts)` — Effect that resolves to a fetch handler (see [Fetch Handler](#fetch-handler))

## Creating Functions

```ts
const fn = yield* Inngest.createFunction(
{ id: 'process-order', retries: 5 },
{ event: 'order.created' },
({ event, step, runId, attempt }) => Effect.gen(function* () {
const result = yield* step.run('validate', () =>
Effect.tryPromise(() => validateOrder(event.data.orderId))
)
yield* step.sleep('cooldown', Duration.minutes(5))
yield* step.run('fulfill', () => Effect.succeed(result))
})
)
```

Config supports all Inngest options: `id`, `name`, `retries`, `concurrency`, `throttle`, `idempotency`, `rateLimit`, `debounce`, `priority`, `batchEvents`, `cancelOn`, `timeouts`, `onFailure`.

`createFunction` returns an `Effect` — use `yield*` to extract the function. The handler receives services from the surrounding Effect context via `extract()`.

## Steps

Each Inngest step method is wrapped to return `Effect` instead of `Promise`.

### `step.run`

Run an Effect as a durable, memoized step. The callback must return `Effect<A, E, never>` (no service requirements — capture services in the outer handler scope).

```ts
yield* step.run('my-step', () =>
Effect.tryPromise(() => fetchData())
)
```

### `step.sleep`

Sleep for a duration. Accepts Effect's `Duration` input.

```ts
yield* step.sleep('wait', Duration.hours(1))
yield* step.sleep('short', Duration.seconds(30))
```

### `step.sleepUntil`

Sleep until a specific date or ISO string.

```ts
yield* step.sleepUntil('until', new Date('2024-12-31'))
yield* step.sleepUntil('until', '2024-12-31T00:00:00Z')
```

### `step.invoke`

Invoke another Inngest function.

```ts
yield* step.invoke('call-other', {
function: otherFunctionRef,
data: { key: 'value' },
})
```

### `step.waitForEvent`

Wait for a matching event with a timeout. Returns the event or `null`.

```ts
const approval = yield* step.waitForEvent('wait-approval', {
event: 'order.approved',
timeout: '1h',
match: 'data.orderId',
})
```

### `step.sendEvent`

Send events from within a step (memoized).

```ts
yield* step.sendEvent('notify', {
name: 'notification.send',
data: { message: 'Order processed' },
})
```

## Triggers

### Event trigger

```ts
Inngest.createFunction(config, { event: 'user.signup' }, handler)
Inngest.createFunction(config, { event: 'user.signup', if: 'event.data.premium == true' }, handler)
```

### Cron trigger (string)

```ts
Inngest.createFunction(config, { cron: '0 9 * * *' }, handler)
```

### Cron trigger (Effect `Cron.Cron`)

```ts
import { Cron } from 'effect'

Inngest.createFunction(config, { cron: Cron.unsafeParse('0 9 * * *') }, handler)
```

## Event Schemas

Event types flow automatically through Inngest SDK's generics:

```ts
const client = new InngestSdk({
id: 'my-app',
schemas: new EventSchemas().fromZod({
'user.signup': { data: z.object({ email: z.string() }) },
'order.created': { data: z.object({ orderId: z.string(), amount: z.number() }) },
}),
})

const Inngest = createInngest(Effect.succeed(client))

Inngest.createFunction(
{ id: 'on-signup' },
{ event: 'user.signup' },
({ event }) => Effect.gen(function* () {
// event.data.email is typed as string
console.log(event.data.email)
})
)
```

## HTTP Handler

Returns an Effect `HttpApp.Default` (from `@effect/platform`) for serving Inngest functions. Use this when composing with Effect's HTTP server stack.

```ts
const app = yield* Inngest.httpHandler({
functions: [fn1, fn2],
servePath: '/api/inngest', // optional, default: /api/inngest
})
```

### With `@effect/platform` HTTP server

```ts
import { HttpRouter, HttpServer } from '@effect/platform'

HttpRouter.empty.pipe(
HttpRouter.mountApp('/api/inngest', app),
)
```

## Fetch Handler

Returns a raw fetch handler `(Request) => Promise<Response>` for direct use with `Bun.serve` or ff-serv.

```ts
const handler = yield* Inngest.fetchHandler({
functions: [fn1, fn2],
servePath: '/api/inngest',
})
```

### With `Bun.serve`

```ts
Bun.serve({ fetch: handler })
```

### With ff-serv

```ts
import { basicHandler } from 'ff-serv/http/basic'

basicHandler(
(url) => url.pathname.startsWith('/api/inngest'),
handler,
)
```

## Sending Events

Send events outside of functions using `send()`. Requires the Inngest Tag in the Effect context.

```ts
yield* Inngest.send({ name: 'user.signup', data: { email: 'user@example.com' } })
```

Provide the client via `Inngest.layer`:

```ts
Effect.gen(function* () {
yield* Inngest.send({ name: 'user.signup', data: { email: 'user@example.com' } })
}).pipe(Effect.provide(Inngest.layer))
```
117 changes: 117 additions & 0 deletions packages/effect/e2e/inngest.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { FetchHttpClient, HttpClient } from '@effect/platform';
import { it } from '@effect/vitest';
import { Effect, Schedule } from 'effect';
import { Inngest as InngestSdk } from 'inngest';
import { describe, expect } from 'vitest';
import { createInngest } from '../src/for/inngest';

const INNGEST_DEV_URL = 'http://localhost:8288';

function poll(predicate: () => boolean, interval: number, timeout: number) {
return Effect.suspend(() =>
predicate() ? Effect.void : Effect.fail(new Error('Condition not met')),
).pipe(Effect.retry(Schedule.spaced(interval)), Effect.timeout(timeout));
}

const devServerRunning = await Effect.runPromise(
HttpClient.get(INNGEST_DEV_URL).pipe(
Effect.map((res) => res.status < 500),
Effect.orElseSucceed(() => false),
Effect.provide(FetchHttpClient.layer),
),
);

describe.skipIf(!devServerRunning)('inngest integration', () => {
const client = new InngestSdk({ id: 'integration-test', isDev: true });
const Inngest = createInngest(Effect.succeed(client));

it.scopedLive('function executes via inngest dev server', () =>
Effect.gen(function* () {
const executionLog: string[] = [];

const fn = yield* Inngest.createFunction(
{ id: 'test-hello' },
{ event: 'test/hello' },
({ event, step }) =>
Effect.gen(function* () {
yield* step.run('log-event', () =>
Effect.sync(() => {
executionLog.push(`received: ${event.data.message}`);
}),
);
}),
);

const handler = yield* Inngest.fetchHandler({
functions: [fn],
servePath: '/api/inngest',
});

const server = Bun.serve({ port: 0, fetch: handler });

yield* Effect.addFinalizer(() => Effect.sync(() => server.stop(true)));

yield* HttpClient.put(`http://localhost:${server.port}/api/inngest`);

yield* Effect.sleep(1500);

yield* Inngest.send({ name: 'test/hello', data: { message: 'world' } });

yield* poll(() => executionLog.includes('received: world'), 500, 15_000);

expect(executionLog).toContain('received: world');
}).pipe(
Effect.provide(Inngest.layer),
Effect.provide(FetchHttpClient.layer),
),
);

it.scopedLive('step tools work correctly', () =>
Effect.gen(function* () {
const executionLog: string[] = [];

const fn = yield* Inngest.createFunction(
{ id: 'test-steps' },
{ event: 'test/steps' },
({ step }) =>
Effect.gen(function* () {
const firstResult = yield* step.run('first-step', () =>
Effect.succeed('step-one-done'),
);

yield* step.run('second-step', () =>
Effect.sync(() => {
executionLog.push(`second step got: ${firstResult}`);
}),
);
}),
);

const handler = yield* Inngest.fetchHandler({
functions: [fn],
servePath: '/api/inngest',
});

const server = Bun.serve({ port: 0, fetch: handler });

yield* Effect.addFinalizer(() => Effect.sync(() => server.stop(true)));

yield* HttpClient.put(`http://localhost:${server.port}/api/inngest`);

yield* Effect.sleep(1500);

yield* Inngest.send({ name: 'test/steps', data: {} });

yield* poll(
() => executionLog.includes('second step got: step-one-done'),
500,
15_000,
);

expect(executionLog).toContain('second step got: step-one-done');
}).pipe(
Effect.provide(Inngest.layer),
Effect.provide(FetchHttpClient.layer),
),
);
});
Loading