Skip to content

Commit 423946a

Browse files
committed
feat: Add support for cloud tasks
1 parent 455ca58 commit 423946a

4 files changed

Lines changed: 137 additions & 2 deletions

File tree

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,10 @@ const program = Effect.gen(function* () {
129129
const posts = yield* repo.query(Query.where('status', '==', 'published'));
130130

131131
return { postId, posts };
132-
}).pipe(Effect.provide(PostRepository), Effect.provide(Client.layerFromApp(app)));
132+
}).pipe(
133+
Effect.provide(PostRepository),
134+
Effect.provide(Client.layerFromApp(app))
135+
);
133136

134137
Effect.runPromise(program).then(console.log);
135138
```
@@ -225,6 +228,8 @@ Platform-agnostic schemas for Firestore types:
225228
- `onDocumentUpdated` - Firestore trigger
226229
- `onDocumentDeleted` - Firestore trigger
227230
- `onDocumentWritten` - Firestore trigger
231+
- `onMessagePublished` - Pub/Sub trigger
232+
- `onTaskDispatched` - Cloud Tasks trigger
228233

229234
### Planned Features
230235

packages/admin/README.md

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Firebase Admin SDK integration for Effect Firebase. This package provides the Fi
1212
- 📝 **Cloud Logging** - Automatic logging integration
1313
- 🎯 **Type-Safe Function Handlers** - Schema validation for function inputs/outputs
1414
- 🔄 **Firestore Triggers** - Document lifecycle event handlers
15+
- 📮 **Cloud Tasks Triggers** - Typed task queue handlers with optional payload decoding
1516
- 🚀 **Effect Native** - Built on Effect's powerful composition and error handling
1617

1718
## Installation
@@ -28,7 +29,11 @@ npm install firebase-admin firebase-functions
2829
```typescript
2930
import { Effect } from 'effect';
3031
import { initializeApp } from 'firebase-admin/app';
31-
import { Admin, FunctionsRuntime, onRequestEffect } from '@effect-firebase/admin';
32+
import {
33+
Admin,
34+
FunctionsRuntime,
35+
onRequestEffect,
36+
} from '@effect-firebase/admin';
3237

3338
// Create the runtime with your layers
3439
const runtime = FunctionsRuntime.make(Admin.layerFromApp(initializeApp()));
@@ -89,6 +94,41 @@ export const createPost = onCallEffect(
8994
);
9095
```
9196

97+
### Cloud Tasks Functions (onTaskDispatched)
98+
99+
```typescript
100+
import { Effect, Schema } from 'effect';
101+
import { initializeApp } from 'firebase-admin/app';
102+
import {
103+
Admin,
104+
FunctionsRuntime,
105+
onTaskDispatchedEffect,
106+
} from '@effect-firebase/admin';
107+
108+
const runtime = FunctionsRuntime.make(Admin.layerFromApp(initializeApp()));
109+
110+
const ProcessEmailTask = Schema.Struct({
111+
email: Schema.String,
112+
template: Schema.String,
113+
});
114+
115+
export const processEmail = onTaskDispatchedEffect(
116+
{
117+
runtime,
118+
retryConfig: { maxAttempts: 5 },
119+
rateLimits: { maxConcurrentDispatches: 10 },
120+
dataSchema: ProcessEmailTask,
121+
},
122+
(task, request) =>
123+
Effect.gen(function* () {
124+
yield* Effect.log('Processing task', {
125+
email: task.email,
126+
queue: request.queueName,
127+
});
128+
})
129+
);
130+
```
131+
92132
## Cloud Logging
93133

94134
The Admin layer automatically provides Cloud Logging integration:
@@ -153,6 +193,8 @@ export const myFunction = onCallEffect(
153193
- `onDocumentUpdatedEffect` - Document updated trigger
154194
- `onDocumentDeletedEffect` - Document deleted trigger
155195
- `onDocumentWrittenEffect` - Document written (any change) trigger
196+
- `onMessagePublishedEffect` - Pub/Sub message published trigger
197+
- `onTaskDispatchedEffect` - Cloud Tasks dispatch trigger
156198
- `FunctionsRuntime.make(layer)` - Create an Effect runtime from a layer
157199
- `FunctionsRuntime.Default(app)` - Create a runtime from the provided Firebase Admin app
158200

packages/admin/src/lib/functions/functions.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ export * from './on-document-updated.js';
77
export * from './on-document-deleted.js';
88
export * from './on-document-written.js';
99
export * from './on-message-published.js';
10+
export * from './on-task-dispatched.js';
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { Effect, Schema } from 'effect';
2+
import {
3+
onTaskDispatched,
4+
Request,
5+
TaskQueueFunction,
6+
TaskQueueOptions,
7+
} from 'firebase-functions/v2/tasks';
8+
import { logger } from 'firebase-functions';
9+
import { run, Runtime } from './run.js';
10+
11+
interface TaskDispatchedEffectOptions<R> extends TaskQueueOptions {
12+
runtime: Runtime<R>;
13+
}
14+
15+
interface TaskDispatchedEffectOptionsWithSchema<R, S extends Schema.Schema.Any>
16+
extends TaskDispatchedEffectOptions<R | Schema.Schema.Context<S>> {
17+
dataSchema: S;
18+
}
19+
20+
/**
21+
* Decode task payload JSON data using the provided schema.
22+
*/
23+
function decodeTaskData<S extends Schema.Schema.Any>(
24+
schema: S,
25+
request: Request<unknown>
26+
): Effect.Effect<Schema.Schema.Type<S>, Error, Schema.Schema.Context<S>> {
27+
return Schema.decodeUnknown(schema)(request.data).pipe(
28+
Effect.mapError(
29+
(error) => new Error(`Failed to decode task payload: ${error.message}`)
30+
)
31+
) as Effect.Effect<Schema.Schema.Type<S>, Error, Schema.Schema.Context<S>>;
32+
}
33+
34+
/**
35+
* Create a Firebase Functions Cloud Tasks trigger that runs an effect when a task is dispatched.
36+
*
37+
* @param options - The options for the Cloud Tasks trigger including optional payload schema.
38+
* @param handler - The handler function that runs the effect.
39+
* @returns The Firebase Functions Cloud Tasks trigger.
40+
*/
41+
// Overload: with payload schema
42+
export function onTaskDispatchedEffect<R, S extends Schema.Schema.Any, E>(
43+
options: TaskDispatchedEffectOptionsWithSchema<R, S>,
44+
handler: (
45+
data: Schema.Schema.Type<S>,
46+
request: Request<Schema.Schema.Encoded<S>>
47+
) => Effect.Effect<void, E, R>
48+
): TaskQueueFunction<Schema.Schema.Encoded<S>>;
49+
50+
// Overload: without payload schema (full control)
51+
export function onTaskDispatchedEffect<R, T, E>(
52+
options: TaskDispatchedEffectOptions<R>,
53+
handler: (request: Request<T>) => Effect.Effect<void, E, R>
54+
): TaskQueueFunction<T>;
55+
56+
// Implementation
57+
export function onTaskDispatchedEffect<R>(
58+
options: TaskDispatchedEffectOptions<R> & {
59+
dataSchema?: Schema.Schema.Any;
60+
},
61+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
62+
handler: (...args: any[]) => Effect.Effect<void, unknown, R>
63+
): TaskQueueFunction<unknown> {
64+
const { dataSchema } = options;
65+
66+
return onTaskDispatched(options, async (request) => {
67+
const effect = Effect.gen(function* () {
68+
if (dataSchema) {
69+
// Decode task payload and pass both parsed payload and request to handler
70+
const taskData = yield* decodeTaskData(dataSchema, request);
71+
return yield* handler(taskData, request);
72+
} else {
73+
// Pass raw request to handler
74+
return yield* handler(request);
75+
}
76+
});
77+
78+
await run(options.runtime, effect as Effect.Effect<void, never, R>).catch(
79+
(error) => {
80+
logger.error('Defect in onTaskDispatched', {
81+
inner: error,
82+
stack: error instanceof Error ? error.stack : undefined,
83+
});
84+
}
85+
);
86+
});
87+
}

0 commit comments

Comments
 (0)