From bb0082163134787c8b229bdfb73da6003405348a Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Fri, 5 Nov 2021 07:30:15 -0700 Subject: [PATCH 1/3] Add TQ support --- CHANGELOG.md | 1 + spec/common/providers/https.spec.ts | 326 ++++++++++++++++++++++------ spec/v1/providers/https.spec.ts | 107 +++++++++ spec/v2/providers/https.spec.ts | 136 ++++++++++++ src/common/encoding.ts | 6 + src/common/providers/https.ts | 240 +++++++++++++++----- src/function-builder.ts | 8 + src/handler-builder.ts | 16 ++ src/providers/https.ts | 92 +++++++- src/v2/providers/https.ts | 98 ++++++++- 10 files changed, 904 insertions(+), 126 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e69de29bb..f41e7f57d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -0,0 +1 @@ +- Parallelizes network calls that occur when validating authorization for onCall handlers. diff --git a/spec/common/providers/https.spec.ts b/spec/common/providers/https.spec.ts index 0e957a255..c0a1c7767 100644 --- a/spec/common/providers/https.spec.ts +++ b/spec/common/providers/https.spec.ts @@ -4,8 +4,16 @@ import * as firebase from 'firebase-admin'; import * as sinon from 'sinon'; import { apps as appsNamespace } from '../../../src/apps'; -import * as https from '../../../src/common/providers/https'; import * as debug from '../../../src/common/debug'; +import * as https from '../../../src/common/providers/https'; +import { + CallableContext, + CallableRequest, + TaskContext, + TaskRequest, + unsafeDecodeAppCheckToken, + unsafeDecodeIdToken, +} from '../../../src/common/providers/https'; import * as mocks from '../../fixtures/credential/key.json'; import { expectedResponseHeaders, @@ -17,12 +25,6 @@ import { mockFetchPublicKeys, mockRequest, } from '../../fixtures/mockrequest'; -import { - CallableContext, - CallableRequest, - unsafeDecodeAppCheckToken, - unsafeDecodeIdToken, -} from '../../../src/common/providers/https'; /** * RunHandlerResult contains the data from an express.Response. @@ -115,7 +117,7 @@ function runHandler( } // Runs a CallTest test. -async function runTest(test: CallTest): Promise { +async function runCallableTest(test: CallTest): Promise { const opts = { cors: { origin: true, methods: 'POST' }, ...test.callableOption, @@ -143,8 +145,50 @@ async function runTest(test: CallTest): Promise { expect(responseV2.status).to.equal(test.expectedHttpResponse.status); } +/** Represents a test case for a Task Queue Function */ +interface TaskTest { + // An http request, mocking a subset of https.Request. + httpRequest: any; + + // The expected format of the request passed to the handler. + expectedData: any; + + taskFunction?: ( + data: any, + context: https.TaskContext + ) => void | Promise; + + taskFunction2?: (request: https.TaskRequest) => void | Promise; + + // The expected shape of the http response returned to the callable SDK. + expectedStatus: number; +} + +// Runs a TaskTest test. +async function runTaskTest(test: TaskTest): Promise { + const taskQueueFunctionV1 = https.onEnqueueHandler((data, context) => { + expect(data).to.deep.equal(test.expectedData); + if (test.taskFunction) { + test.taskFunction(data, context); + } + }); + + const responseV1 = await runHandler(taskQueueFunctionV1, test.httpRequest); + expect(responseV1.status).to.equal(test.expectedStatus); + + const taskQueueFunctionV2 = https.onEnqueueHandler((request) => { + expect(request.data).to.deep.equal(test.expectedData); + if (test.taskFunction2) { + test.taskFunction2(request); + } + }); + + const responseV2 = await runHandler(taskQueueFunctionV2, test.httpRequest); + expect(responseV2.status).to.equal(test.expectedStatus); +} + function checkAuthContext( - context: CallableContext, + context: CallableContext | CallableRequest | TaskContext | TaskRequest, projectId: string, userId: string ) { @@ -154,11 +198,15 @@ function checkAuthContext( expect(context.auth.token.uid).to.equal(userId); expect(context.auth.token.sub).to.equal(userId); expect(context.auth.token.aud).to.equal(projectId); - expect(context.instanceIdToken).to.be.undefined; + + // TaskContext & TaskRequest don't have instanceIdToken + if ({}.hasOwnProperty.call(context, 'instanceIdToken')) { + expect((context as CallableContext).instanceIdToken).to.be.undefined; + } } function checkAppCheckContext( - context: CallableContext, + context: CallableContext | CallableRequest, projectId: string, appId: string ) { @@ -172,35 +220,6 @@ function checkAppCheckContext( expect(context.instanceIdToken).to.be.undefined; } -function checkAuthRequest( - request: CallableRequest, - projectId: string, - userId: string -) { - expect(request.auth).to.not.be.undefined; - expect(request.auth).to.not.be.null; - expect(request.auth.uid).to.equal(userId); - expect(request.auth.token.uid).to.equal(userId); - expect(request.auth.token.sub).to.equal(userId); - expect(request.auth.token.aud).to.equal(projectId); - expect(request.instanceIdToken).to.be.undefined; -} - -function checkAppCheckRequest( - request: CallableRequest, - projectId: string, - appId: string -) { - expect(request.app).to.not.be.undefined; - expect(request.app).to.not.be.null; - expect(request.app.appId).to.equal(appId); - expect(request.app.token.app_id).to.be.equal(appId); - expect(request.app.token.sub).to.be.equal(appId); - expect(request.app.token.aud).to.be.deep.equal([`projects/${projectId}`]); - expect(request.auth).to.be.undefined; - expect(request.instanceIdToken).to.be.undefined; -} - describe('onCallHandler', () => { let app: firebase.app.App; @@ -231,7 +250,7 @@ describe('onCallHandler', () => { }); it('should handle success', () => { - return runTest({ + return runCallableTest({ httpRequest: mockRequest({ foo: 'bar' }), expectedData: { foo: 'bar' }, callableFunction: (data, context) => ({ baz: 'qux' }), @@ -245,7 +264,7 @@ describe('onCallHandler', () => { }); it('should handle null data and return', () => { - return runTest({ + return runCallableTest({ httpRequest: mockRequest(null), expectedData: null, callableFunction: (data, context) => null, @@ -259,7 +278,7 @@ describe('onCallHandler', () => { }); it('should handle void return', () => { - return runTest({ + return runCallableTest({ httpRequest: mockRequest(null), expectedData: null, callableFunction: (data, context) => { @@ -279,7 +298,7 @@ describe('onCallHandler', () => { it('should reject bad method', () => { const req = mockRequest(null); req.method = 'GET'; - return runTest({ + return runCallableTest({ httpRequest: req, expectedData: null, callableFunction: (data, context) => { @@ -299,7 +318,7 @@ describe('onCallHandler', () => { }); it('should ignore charset', () => { - return runTest({ + return runCallableTest({ httpRequest: mockRequest(null, 'application/json; charset=utf-8'), expectedData: null, callableFunction: (data, context) => { @@ -317,7 +336,7 @@ describe('onCallHandler', () => { }); it('should reject bad content type', () => { - return runTest({ + return runCallableTest({ httpRequest: mockRequest(null, 'text/plain'), expectedData: null, callableFunction: (data, context) => { @@ -339,7 +358,7 @@ describe('onCallHandler', () => { it('should reject extra body fields', () => { const req = mockRequest(null); req.body.extra = 'bad'; - return runTest({ + return runCallableTest({ httpRequest: req, expectedData: null, callableFunction: (data, context) => { @@ -359,7 +378,7 @@ describe('onCallHandler', () => { }); it('should handle unhandled error', () => { - return runTest({ + return runCallableTest({ httpRequest: mockRequest(null), expectedData: null, callableFunction: (data, context) => { @@ -377,7 +396,7 @@ describe('onCallHandler', () => { }); it('should handle unknown error status', () => { - return runTest({ + return runCallableTest({ httpRequest: mockRequest(null), expectedData: null, callableFunction: (data, context) => { @@ -395,7 +414,7 @@ describe('onCallHandler', () => { }); it('should handle well-formed error', () => { - return runTest({ + return runCallableTest({ httpRequest: mockRequest(null), expectedData: null, callableFunction: (data, context) => { @@ -416,7 +435,7 @@ describe('onCallHandler', () => { const mock = mockFetchPublicKeys(); const projectId = appsNamespace().admin.options.projectId; const idToken = generateIdToken(projectId); - await runTest({ + await runCallableTest({ httpRequest: mockRequest(null, 'application/json', { authorization: 'Bearer ' + idToken, }), @@ -426,7 +445,7 @@ describe('onCallHandler', () => { return null; }, callableFunction2: (request) => { - checkAuthRequest(request, projectId, mocks.user_id); + checkAuthContext(request, projectId, mocks.user_id); return null; }, expectedHttpResponse: { @@ -441,7 +460,7 @@ describe('onCallHandler', () => { it('should reject bad auth', async () => { const projectId = appsNamespace().admin.options.projectId; const idToken = generateUnsignedIdToken(projectId); - await runTest({ + await runCallableTest({ httpRequest: mockRequest(null, 'application/json', { authorization: 'Bearer ' + idToken, }), @@ -470,7 +489,7 @@ describe('onCallHandler', () => { const projectId = appsNamespace().admin.options.projectId; const appId = '123:web:abc'; const appCheckToken = generateAppCheckToken(projectId, appId); - await runTest({ + await runCallableTest({ httpRequest: mockRequest(null, 'application/json', { appCheckToken }), expectedData: null, callableFunction: (data, context) => { @@ -478,7 +497,7 @@ describe('onCallHandler', () => { return null; }, callableFunction2: (request) => { - checkAppCheckRequest(request, projectId, appId); + checkAppCheckContext(request, projectId, appId); return null; }, expectedHttpResponse: { @@ -494,7 +513,7 @@ describe('onCallHandler', () => { const projectId = appsNamespace().admin.options.projectId; const appId = '123:web:abc'; const appCheckToken = generateUnsignedAppCheckToken(projectId, appId); - await runTest({ + await runCallableTest({ httpRequest: mockRequest(null, 'application/json', { appCheckToken }), expectedData: null, callableFunction: (data, context) => { @@ -517,7 +536,7 @@ describe('onCallHandler', () => { }); it('should handle bad AppCheck token with callable option', async () => { - await runTest({ + await runCallableTest({ httpRequest: mockRequest(null, 'application/json', { appCheckToken: 'FAKE', }), @@ -541,7 +560,7 @@ describe('onCallHandler', () => { }); it('should handle instance id', async () => { - await runTest({ + await runCallableTest({ httpRequest: mockRequest(null, 'application/json', { instanceIdToken: 'iid-token', }), @@ -566,7 +585,7 @@ describe('onCallHandler', () => { it('should expose raw request', async () => { const mockReq = mockRequest(null, 'application/json', {}); - await runTest({ + await runCallableTest({ httpRequest: mockReq, expectedData: null, callableFunction: (data, context) => { @@ -602,7 +621,7 @@ describe('onCallHandler', () => { it('should skip auth token verification', async () => { const projectId = appsNamespace().admin.options.projectId; const idToken = generateUnsignedIdToken(projectId); - await runTest({ + await runCallableTest({ httpRequest: mockRequest(null, 'application/json', { authorization: 'Bearer ' + idToken, }), @@ -612,7 +631,7 @@ describe('onCallHandler', () => { return null; }, callableFunction2: (request) => { - checkAuthRequest(request, projectId, mocks.user_id); + checkAuthContext(request, projectId, mocks.user_id); return null; }, expectedHttpResponse: { @@ -627,7 +646,7 @@ describe('onCallHandler', () => { const projectId = appsNamespace().admin.options.projectId; const appId = '123:web:abc'; const appCheckToken = generateUnsignedAppCheckToken(projectId, appId); - await runTest({ + await runCallableTest({ httpRequest: mockRequest(null, 'application/json', { appCheckToken }), expectedData: null, callableFunction: (data, context) => { @@ -635,7 +654,7 @@ describe('onCallHandler', () => { return null; }, callableFunction2: (request) => { - checkAppCheckRequest(request, projectId, appId); + checkAppCheckContext(request, projectId, appId); return null; }, expectedHttpResponse: { @@ -648,6 +667,187 @@ describe('onCallHandler', () => { }); }); +describe('onEnqueueHandler', () => { + let app: firebase.app.App; + + before(() => { + const credential = { + getAccessToken: () => { + return Promise.resolve({ + expires_in: 1000, + access_token: 'fake', + }); + }, + getCertificate: () => { + return { + projectId: 'aProjectId', + }; + }, + }; + app = firebase.initializeApp({ + projectId: 'aProjectId', + credential, + }); + Object.defineProperty(appsNamespace(), 'admin', { get: () => app }); + }); + + after(() => { + app.delete(); + delete appsNamespace.singleton; + }); + + it('should handle success', () => { + return runTaskTest({ + httpRequest: mockRequest({ foo: 'bar' }), + expectedData: { foo: 'bar' }, + expectedStatus: 204, + }); + }); + + it('should reject bad method', () => { + const req = mockRequest(null); + req.method = 'GET'; + return runTaskTest({ + httpRequest: req, + expectedData: null, + expectedStatus: 400, + }); + }); + + it('should ignore charset', () => { + return runTaskTest({ + httpRequest: mockRequest(null, 'application/json; charset=utf-8'), + expectedData: null, + expectedStatus: 204, + }); + }); + + it('should reject bad content type', () => { + return runTaskTest({ + httpRequest: mockRequest(null, 'text/plain'), + expectedData: null, + expectedStatus: 400, + }); + }); + + it('should reject extra body fields', () => { + const req = mockRequest(null); + req.body.extra = 'bad'; + return runTaskTest({ + httpRequest: req, + expectedData: null, + expectedStatus: 400, + }); + }); + + it('should handle unhandled error', () => { + return runTaskTest({ + httpRequest: mockRequest(null), + expectedData: null, + taskFunction: (data, context) => { + throw new Error(`ceci n'est pas une error`); + }, + taskFunction2: (request) => { + throw new Error(`cece n'est pas une error`); + }, + expectedStatus: 500, + }); + }); + + it('should handle unknown error status', () => { + return runTaskTest({ + httpRequest: mockRequest(null), + expectedData: null, + taskFunction: (data, context) => { + throw new https.HttpsError('THIS_IS_NOT_VALID' as any, 'nope'); + }, + taskFunction2: (request) => { + throw new https.HttpsError('THIS_IS_NOT_VALID' as any, 'nope'); + }, + expectedStatus: 500, + }); + }); + + it('should handle well-formed error', () => { + return runTaskTest({ + httpRequest: mockRequest(null), + expectedData: null, + taskFunction: (data, context) => { + throw new https.HttpsError('not-found', 'i am error'); + }, + taskFunction2: (request) => { + throw new https.HttpsError('not-found', 'i am error'); + }, + expectedStatus: 404, + }); + }); + + it('should handle auth', async () => { + const mock = mockFetchPublicKeys(); + const projectId = appsNamespace().admin.options.projectId; + const idToken = generateIdToken(projectId); + await runTaskTest({ + httpRequest: mockRequest(null, 'application/json', { + authorization: 'Bearer ' + idToken, + }), + expectedData: null, + taskFunction: (data, context) => { + checkAuthContext(context, projectId, mocks.user_id); + return null; + }, + taskFunction2: (request) => { + checkAuthContext(request, projectId, mocks.user_id); + return null; + }, + expectedStatus: 204, + }); + mock.done(); + }); + + it('should reject bad auth', async () => { + const projectId = appsNamespace().admin.options.projectId; + const idToken = generateUnsignedIdToken(projectId); + await runTaskTest({ + httpRequest: mockRequest(null, 'application/json', { + authorization: 'Bearer ' + idToken, + }), + expectedData: null, + expectedStatus: 401, + }); + }); + + describe('skip token verification debug mode support', () => { + before(() => { + sinon + .stub(debug, 'isDebugFeatureEnabled') + .withArgs('skipTokenVerification') + .returns(true); + }); + + after(() => { + sinon.verifyAndRestore(); + }); + + it('should skip auth token verification', async () => { + const projectId = appsNamespace().admin.options.projectId; + const idToken = generateUnsignedIdToken(projectId); + await runTaskTest({ + httpRequest: mockRequest(null, 'application/json', { + authorization: 'Bearer ' + idToken, + }), + expectedData: null, + taskFunction: (data, context) => { + checkAuthContext(context, projectId, mocks.user_id); + }, + taskFunction2: (request) => { + checkAuthContext(request, projectId, mocks.user_id); + }, + expectedStatus: 204, + }); + }); + }); +}); + describe('encoding/decoding', () => { it('encodes null', () => { expect(https.encode(null)).to.be.null; diff --git a/spec/v1/providers/https.spec.ts b/spec/v1/providers/https.spec.ts index 025c99a7a..e8c4785c6 100644 --- a/spec/v1/providers/https.spec.ts +++ b/spec/v1/providers/https.spec.ts @@ -135,6 +135,13 @@ describe('handler namespace', () => { expect(result.__trigger).to.deep.equal({}); }); }); + + describe('#onEnqueue', () => { + it('should return an empty trigger', () => { + const result = functions.handler.https.taskQueue.onEnqueue(() => null); + expect(result.__trigger).to.deep.equal({}); + }); + }); }); describe('#onCall', () => { @@ -201,6 +208,106 @@ describe('#onCall', () => { }); }); +describe('#onEnqueue', () => { + it('should return a Trigger with appropriate values', () => { + const result = https + .taskQueue({ + rateLimits: { + maxBurstSize: 20, + maxConcurrentDispatches: 30, + maxDispatchesPerSecond: 40, + }, + retryConfig: { + maxAttempts: 5, + maxBackoffSeconds: 20, + maxDoublings: 3, + minBackoffSeconds: 5, + }, + invoker: 'private', + }) + .onEnqueue(() => {}); + expect(result.__trigger).to.deep.equal({ + taskQueueTrigger: { + rateLimits: { + maxBurstSize: 20, + maxConcurrentDispatches: 30, + maxDispatchesPerSecond: 40, + }, + retryConfig: { + maxAttempts: 5, + maxBackoffSeconds: 20, + maxDoublings: 3, + minBackoffSeconds: 5, + }, + invoker: ['private'], + }, + }); + }); + + it('should allow both region and runtime options to be set', () => { + const fn = functions + .region('us-east1') + .runWith({ + timeoutSeconds: 90, + memory: '256MB', + }) + .https.taskQueue({ retryConfig: { maxAttempts: 5 } }) + .onEnqueue(() => null); + + expect(fn.__trigger).to.deep.equal({ + regions: ['us-east1'], + availableMemoryMb: 256, + timeout: '90s', + taskQueueTrigger: { + retryConfig: { + maxAttempts: 5, + }, + }, + }); + }); + + it('has a .run method', async () => { + const data = 'data'; + const context = { + auth: { + uid: 'abc', + token: 'token' as any, + }, + }; + let done = false; + const cf = https.taskQueue().onEnqueue((d, c) => { + expect(d).to.equal(data); + expect(c).to.deep.equal(context); + done = true; + }); + + await cf.run(data, context); + expect(done).to.be.true; + }); + + // Regression test for firebase-functions#947 + it('should lock to the v1 API even with function.length == 1', async () => { + let gotData: Record; + const func = https.taskQueue().onEnqueue((data) => { + gotData = data; + }); + + const req = new MockRequest( + { + data: { foo: 'bar' }, + }, + { + 'content-type': 'application/json', + } + ); + req.method = 'POST'; + + const response = await runHandler(func, req as any); + expect(response.status).to.equal(204); + expect(gotData).to.deep.equal({ foo: 'bar' }); + }); +}); + describe('callable CORS', () => { it('handles OPTIONS preflight', async () => { const func = https.onCall((data, context) => { diff --git a/spec/v2/providers/https.spec.ts b/spec/v2/providers/https.spec.ts index a28cb1c9c..d815d43e2 100644 --- a/spec/v2/providers/https.spec.ts +++ b/spec/v2/providers/https.spec.ts @@ -366,3 +366,139 @@ describe('onCall', () => { https.onCall((request: https.CallableRequest) => `Hello, ${request.data}`); }); }); + +describe('onTaskEnqueue', () => { + beforeEach(() => { + options.setGlobalOptions({}); + process.env.GCLOUD_PROJECT = 'aProject'; + }); + + afterEach(() => { + delete process.env.GCLOUD_PROJECT; + }); + + it('should return a minimal trigger with appropriate values', () => { + const result = https.onTaskEnqueue(() => {}); + expect(result.__trigger).to.deep.equal({ + apiVersion: 2, + platform: 'gcfv2', + taskQueueTrigger: {}, + labels: {}, + }); + }); + + it('should create a complex trigger with appropriate values', () => { + const result = https.onTaskEnqueue( + { + ...FULL_OPTIONS, + retryConfig: { + maxAttempts: 4, + maxDoublings: 3, + minBackoffSeconds: 1, + maxBackoffSeconds: 2, + }, + rateLimits: { + maxBurstSize: 10, + maxConcurrentDispatches: 5, + maxDispatchesPerSecond: 10, + }, + invoker: 'private', + }, + () => {} + ); + expect(result.__trigger).to.deep.equal({ + ...FULL_TRIGGER, + taskQueueTrigger: { + retryConfig: { + maxAttempts: 4, + maxDoublings: 3, + minBackoffSeconds: 1, + maxBackoffSeconds: 2, + }, + rateLimits: { + maxBurstSize: 10, + maxConcurrentDispatches: 5, + maxDispatchesPerSecond: 10, + }, + invoker: ['private'], + }, + }); + }); + + it('should merge options and globalOptions', () => { + options.setGlobalOptions({ + concurrency: 20, + region: 'europe-west1', + minInstances: 1, + }); + + const result = https.onTaskEnqueue( + { + region: 'us-west1', + minInstances: 3, + }, + (request) => {} + ); + + expect(result.__trigger).to.deep.equal({ + apiVersion: 2, + platform: 'gcfv2', + taskQueueTrigger: {}, + concurrency: 20, + minInstances: 3, + regions: ['us-west1'], + labels: {}, + }); + }); + + it('has a .run method', async () => { + const request: any = { + data: 'data', + auth: { + uid: 'abc', + token: 'token', + }, + }; + const cf = https.onTaskEnqueue((r) => { + expect(r.data).to.deep.equal(request.data); + expect(r.auth).to.deep.equal(request.auth); + }); + + await cf.run(request); + }); + + it('should be an express handler', async () => { + const func = https.onTaskEnqueue((request) => {}); + + const req = new MockRequest( + { + data: {}, + }, + { + 'content-type': 'application/json', + origin: 'example.com', + } + ); + req.method = 'POST'; + + const resp = await runHandler(func, req as any); + expect(resp.status).to.equal(204); + }); + + // These tests pass if the code transpiles + it('allows desirable syntax', () => { + https.onTaskEnqueue((request: https.TaskRequest) => { + // There should be no lint warnings that data is not a string. + console.log(`hello, ${request.data}`); + }); + https.onTaskEnqueue((request: https.TaskRequest) => { + console.log(`hello, ${request.data}`); + }); + https.onTaskEnqueue((request: https.TaskRequest) => { + console.log(`hello, ${request.data}`); + }); + https.onTaskEnqueue((request: https.TaskRequest) => { + console.log(`Hello, ${request.data}`); + }); + }); +}); diff --git a/src/common/encoding.ts b/src/common/encoding.ts index 0cbc2d50f..8960b5ddc 100644 --- a/src/common/encoding.ts +++ b/src/common/encoding.ts @@ -21,6 +21,9 @@ export function copyIfPresent( src: Src, ...fields: Array ) { + if (!src) { + return; + } for (const field of fields) { if (!Object.prototype.hasOwnProperty.call(src, field)) { continue; @@ -38,6 +41,9 @@ export function convertIfPresent( return from; } ) { + if (!src) { + return; + } if (!Object.prototype.hasOwnProperty.call(src, srcField)) { return; } diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index fa64116cd..dfece442a 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -169,6 +169,55 @@ export interface CallableRequest { rawRequest: Request; } +/** How a task should be retried in the event of a non-2xx return. */ +export interface TaskRetryConfig { + // If left unspecified, will default to 5 + maxAttempts?: number; + + // If left unspecified will default to 1hr + maxBackoffSeconds?: number; + + // If left unspecified will default to 16 + maxDoublings?: number; + + // If left unspecified will default to 100ms + minBackoffSeconds?: number; +} + +/** How congestion control should be applied to the function. */ +export interface TaskRateLimits { + // If left unspecified, will default to 100 + maxBurstSize?: number; + + // If left unspecified, wild default to 1000 + maxConcurrentDispatches?: number; + + // If left unspecified, will default to 500 + maxDispatchesPerSecond?: number; +} + +export interface TaskContext { + /** + * The result of decoding and verifying an ODIC token. + */ + auth?: AuthData; +} + +/** + * The request used to call a Task Queue function. + */ +export interface TaskRequest { + /** + * The parameters used by a client when calling this function. + */ + data: T; + + /** + * The result of decoding and verifying an ODIC token. + */ + auth?: AuthData; +} + /** * The set of Firebase Functions status codes. The codes are the same at the * ones exposed by gRPC here: @@ -561,68 +610,23 @@ export function unsafeDecodeAppCheckToken(token: string): DecodedAppCheckToken { * @param {CallableContext} ctx - Context to be sent to callable function handler. * @return {CallableTokenStatus} Status of the token verifications. */ -/** @hidden */ async function checkTokens( req: Request, ctx: CallableContext ): Promise { const verifications: CallableTokenStatus = { - app: 'MISSING', - auth: 'MISSING', + auth: 'INVALID', + app: 'INVALID', }; - const skipTokenVerify = isDebugFeatureEnabled('skipTokenVerification'); - - const appCheck = req.header('X-Firebase-AppCheck'); - if (appCheck) { - verifications.app = 'INVALID'; - try { - if (!apps().admin.appCheck) { - throw new Error( - 'Cannot validate AppCheck token. Please update Firebase Admin SDK to >= v9.8.0' - ); - } - let appCheckData; - if (skipTokenVerify) { - const decodedToken = unsafeDecodeAppCheckToken(appCheck); - appCheckData = { appId: decodedToken.app_id, token: decodedToken }; - } else { - appCheckData = await apps() - .admin.appCheck() - .verifyToken(appCheck); - } - ctx.app = appCheckData; - verifications.app = 'VALID'; - } catch (err) { - logger.warn('Failed to validate AppCheck token.', err); - } - } - - const authorization = req.header('Authorization'); - if (authorization) { - verifications.auth = 'INVALID'; - const match = authorization.match(/^Bearer (.*)$/); - if (match) { - const idToken = match[1]; - try { - let authToken: firebase.auth.DecodedIdToken; - if (skipTokenVerify) { - authToken = unsafeDecodeIdToken(idToken); - } else { - authToken = await apps() - .admin.auth() - .verifyIdToken(idToken); - } - verifications.auth = 'VALID'; - ctx.auth = { - uid: authToken.uid, - token: authToken, - }; - } catch (err) { - logger.warn('Failed to validate auth token.', err); - } - } - } + await Promise.all([ + Promise.resolve().then(async () => { + verifications.auth = await checkAuthToken(req, ctx); + }), + Promise.resolve().then(async () => { + verifications.app = await checkAppCheckToken(req, ctx); + }), + ]); const logPayload = { verifications, @@ -651,8 +655,78 @@ async function checkTokens( return verifications; } -type v1Handler = (data: any, context: CallableContext) => any | Promise; -type v2Handler = (request: CallableRequest) => Res; +/** @hidden */ +async function checkAuthToken( + req: Request, + ctx: CallableContext | TaskContext +): Promise { + const authorization = req.header('Authorization'); + if (!authorization) { + return 'MISSING'; + } + const match = authorization.match(/^Bearer (.*)$/); + if (match) { + const idToken = match[1]; + try { + let authToken: firebase.auth.DecodedIdToken; + if (isDebugFeatureEnabled('skipTokenVerification')) { + authToken = unsafeDecodeIdToken(idToken); + } else { + authToken = await apps() + .admin.auth() + .verifyIdToken(idToken); + } + ctx.auth = { + uid: authToken.uid, + token: authToken, + }; + return 'VALID'; + } catch (err) { + logger.warn('Failed to validate auth token.', err); + return 'INVALID'; + } + } +} + +/** @hidden */ +async function checkAppCheckToken( + req: Request, + ctx: CallableContext +): Promise { + const appCheck = req.header('X-Firebase-AppCheck'); + if (!appCheck) { + return 'MISSING'; + } + try { + if (!apps().admin.appCheck) { + throw new Error( + 'Cannot validate AppCheck token. Please update Firebase Admin SDK to >= v9.8.0' + ); + } + let appCheckData; + if (isDebugFeatureEnabled('skipTokenVerification')) { + const decodedToken = unsafeDecodeAppCheckToken(appCheck); + appCheckData = { appId: decodedToken.app_id, token: decodedToken }; + } else { + appCheckData = await apps() + .admin.appCheck() + .verifyToken(appCheck); + } + ctx.app = appCheckData; + return 'VALID'; + } catch (err) { + logger.warn('Failed to validate AppCheck token.', err); + return 'INVALID'; + } +} + +type v1CallableHandler = ( + data: any, + context: CallableContext +) => any | Promise; +type v2CallableHandler = (request: CallableRequest) => Res; +type v1TaskHandler = (data: any, context: TaskContext) => void | Promise; +type v2TaskHandler = (request: TaskRequest) => void | Promise; /** @hidden **/ export interface CallableOptions { @@ -663,7 +737,7 @@ export interface CallableOptions { /** @hidden */ export function onCallHandler( options: CallableOptions, - handler: v1Handler | v2Handler + handler: v1CallableHandler | v2CallableHandler ): (req: Request, res: express.Response) => Promise { const wrapped = wrapOnCallHandler(options, handler); return (req: Request, res: express.Response) => { @@ -679,7 +753,7 @@ export function onCallHandler( /** @internal */ function wrapOnCallHandler( options: CallableOptions, - handler: v1Handler | v2Handler + handler: v1CallableHandler | v2CallableHandler ): (req: Request, res: express.Response) => Promise { return async (req: Request, res: express.Response): Promise => { try { @@ -740,3 +814,51 @@ function wrapOnCallHandler( } }; } + +/** @internal */ +export function onEnqueueHandler( + handler: v1TaskHandler | v2TaskHandler +): (req: Request, res: express.Response) => Promise { + return async (req: Request, res: express.Response): Promise => { + try { + if (!isValidRequest(req)) { + logger.error('Invalid request, unable to process.'); + throw new HttpsError('invalid-argument', 'Bad Request'); + } + + const context: TaskContext = {}; + const status = await checkAuthToken(req, context); + // Note: this should never happen since task queue functions are guarded by IAM. + if (status === 'INVALID') { + throw new HttpsError('unauthenticated', 'Unauthenticated'); + } + + const data: Req = decode(req.body.data); + if (handler.length === 2) { + await handler(data, context); + } else { + const arg: TaskRequest = { + ...context, + data, + }; + // For some reason the type system isn't picking up that the handler + // is a one argument function. + await (handler as any)(arg); + } + + // If there was some result, encode it in the body. + res.status(204).end(); + } catch (err) { + if (!(err instanceof HttpsError)) { + // This doesn't count as an 'explicit' error. + logger.error('Unhandled error', err); + err = new HttpsError('internal', 'INTERNAL'); + } + + const { status } = err.httpErrorCode; + const body = { error: err.toJSON() }; + + res.status(status).send(body); + } + }; +} diff --git a/src/function-builder.ts b/src/function-builder.ts index ada3b724c..9eaad45f8 100644 --- a/src/function-builder.ts +++ b/src/function-builder.ts @@ -355,6 +355,14 @@ export class FunctionBuilder { context: https.CallableContext ) => any | Promise ) => https._onCallWithOptions(handler, this.options), + + /** + * Declares a task queue function for clients to call using a Firebase Admin SDK. + * @param handler A method that takes a data and context and returns void or Promise. + */ + taskQueue: (options?: https.TaskQueueOptions) => { + return new https.TaskQueueBuilder(options, this.options); + }, }; } diff --git a/src/handler-builder.ts b/src/handler-builder.ts index ad4cd1541..2f66d1572 100644 --- a/src/handler-builder.ts +++ b/src/handler-builder.ts @@ -82,6 +82,22 @@ export class HandlerBuilder { func.__trigger = {}; return func; }, + /** @hidden */ + get taskQueue() { + return { + onEnqueue( + handler: ( + data: any, + context: https.TaskContext + ) => void | Promise + ) { + const builder = new https.TaskQueueBuilder(); + const func = builder.onEnqueue(handler); + func.__trigger = {}; + return func; + }, + }; + }, }; } diff --git a/src/providers/https.ts b/src/providers/https.ts index 8a48f41fe..27fdb3cde 100644 --- a/src/providers/https.ts +++ b/src/providers/https.ts @@ -23,17 +23,36 @@ import * as express from 'express'; import { HttpsFunction, optionsToTrigger, Runnable } from '../cloud-functions'; -import { convertIfPresent, convertInvoker } from '../common/encoding'; +import { + convertIfPresent, + convertInvoker, + copyIfPresent, +} from '../common/encoding'; import { CallableContext, FunctionsErrorCode, HttpsError, onCallHandler, + onEnqueueHandler, Request, + TaskContext, + TaskRateLimits, + TaskRetryConfig, } from '../common/providers/https'; import { DeploymentOptions } from '../function-configuration'; -export { Request, CallableContext, FunctionsErrorCode, HttpsError }; +export { + Request, + CallableContext, + FunctionsErrorCode, + HttpsError, + /** @hidden */ + TaskRetryConfig as TaskRetryPolicy, + /** @hidden */ + TaskRateLimits, + /** @hidden */ + TaskContext, +}; /** * Handle HTTP requests. @@ -56,6 +75,75 @@ export function onCall( return _onCallWithOptions(handler, {}); } +/** + * Configurations for Task Queue Functions. + * @hidden + */ +export interface TaskQueueOptions { + retryConfig?: TaskRetryConfig; + rateLimits?: TaskRateLimits; + + /** + * Who can enqueue tasks for this function. + * If left unspecified, only service accounts which have + * roles/cloudtasks.enqueuer and roles/cloudfunctions.invoker + * will have permissions. + */ + invoker?: 'private' | string | string[]; +} + +export interface TaskQueueFunction { + (req: Request, res: express.Response): Promise; + __trigger: unknown; + run(data: any, context: TaskContext): void | Promise; +} + +export class TaskQueueBuilder { + /** @internal */ + constructor( + private readonly tqOpts?: TaskQueueOptions, + private readonly depOpts?: DeploymentOptions + ) {} + + onEnqueue( + handler: (data: any, context: TaskContext) => void | Promise + ): TaskQueueFunction { + // onCallHandler sniffs the function length of the passed-in callback + // and the user could have only tried to listen to data. Wrap their handler + // in another handler to avoid accidentally triggering the v2 API + const fixedLen = (data: any, context: TaskContext) => + handler(data, context); + const func: any = onEnqueueHandler(fixedLen); + + func.__trigger = { + ...optionsToTrigger(this.depOpts || {}), + taskQueueTrigger: {}, + }; + copyIfPresent(func.__trigger.taskQueueTrigger, this.tqOpts, 'retryConfig'); + copyIfPresent(func.__trigger.taskQueueTrigger, this.tqOpts, 'rateLimits'); + convertIfPresent( + func.__trigger.taskQueueTrigger, + this.tqOpts, + 'invoker', + 'invoker', + convertInvoker + ); + + func.run = handler; + + return func; + } +} + +/** + * Declares a function that can handle tasks enqueued using the Firebase Admin SDK. + * @param options Configuration for the Task Queue that feeds into this function. + * @hidden + */ +export function taskQueue(options?: TaskQueueOptions): TaskQueueBuilder { + return new TaskQueueBuilder(options); +} + /** @hidden */ export function _onRequestWithOptions( handler: (req: Request, resp: express.Response) => void | Promise, diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index 9e4c466e4..843889b59 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -22,18 +22,34 @@ import * as cors from 'cors'; import * as express from 'express'; -import { convertIfPresent, convertInvoker } from '../../common/encoding'; +import { + convertIfPresent, + convertInvoker, + copyIfPresent, +} from '../../common/encoding'; import { CallableRequest, FunctionsErrorCode, HttpsError, onCallHandler, + onEnqueueHandler, Request, + TaskRateLimits, + TaskRequest, + TaskRetryConfig, } from '../../common/providers/https'; import * as options from '../options'; -export { Request, CallableRequest, FunctionsErrorCode, HttpsError }; +export { + Request, + CallableRequest, + FunctionsErrorCode, + HttpsError, + TaskRateLimits, + TaskRequest, + TaskRetryConfig as TaskRetryPolicy, +}; export interface HttpsOptions extends Omit { region?: @@ -43,6 +59,12 @@ export interface HttpsOptions extends Omit { cors?: string | boolean | RegExp | Array; } +export interface TaskQueueOptions extends options.GlobalOptions { + retryConfig?: TaskRetryConfig; + rateLimits?: TaskRateLimits; + invoker?: 'private' | string | string[]; +} + export type HttpsFunction = (( req: Request, res: express.Response @@ -50,6 +72,9 @@ export type HttpsFunction = (( export interface CallableFunction extends HttpsFunction { run(data: CallableRequest): Return; } +export interface TaskQueueFunction extends HttpsFunction { + run(data: TaskRequest): void | Promise; +} export function onRequest( opts: HttpsOptions, @@ -194,3 +219,72 @@ export function onCall>( func.run = handler; return func; } + +/** Handle a request sent to a Cloud Tasks queue. */ +export function onTaskEnqueue( + handler: (request: TaskRequest) => void | Promise +): TaskQueueFunction; + +/** Handle a request sent to a Cloud Tasks queue. */ +export function onTaskEnqueue( + options: TaskQueueOptions, + handler: (request: TaskRequest) => void | Promise +): TaskQueueFunction; + +export function onTaskEnqueue( + optsOrHandler: + | TaskQueueOptions + | ((request: TaskRequest) => void | Promise), + handler?: (request: TaskRequest) => void | Promise +): TaskQueueFunction { + let opts: TaskQueueOptions; + if (arguments.length == 1) { + opts = {}; + handler = optsOrHandler as ( + request: TaskRequest + ) => void | Promise; + } else { + opts = optsOrHandler as TaskQueueOptions; + } + + // wrapTaskHandler sniffs the function length to determine which API to present. + // fix the length to prevent api versions from being mismatched. + const fixedLen = (req: TaskRequest) => handler(req); + const func: any = onEnqueueHandler(fixedLen); + + Object.defineProperty(func, '__trigger', { + get: () => { + const baseOpts = options.optionsToTriggerAnnotations( + options.getGlobalOptions() + ); + // global options calls region a scalar and https allows it to be an array, + // but optionsToTriggerAnnotations handles both cases. + const specificOpts = options.optionsToTriggerAnnotations( + opts as options.GlobalOptions + ); + const taskQueueTrigger: Record = {}; + copyIfPresent(taskQueueTrigger, opts, 'retryConfig', 'rateLimits'); + convertIfPresent( + taskQueueTrigger, + opts, + 'invoker', + 'invoker', + convertInvoker + ); + return { + apiVersion: 2, + platform: 'gcfv2', + ...baseOpts, + ...specificOpts, + labels: { + ...baseOpts?.labels, + ...specificOpts?.labels, + }, + taskQueueTrigger, + }; + }, + }); + + func.run = handler; + return func; +} From 64320f7a8e0c9364c1f527ceea8c37afcb8aab6d Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Fri, 5 Nov 2021 07:42:41 -0700 Subject: [PATCH 2/3] Adjust visibility labels & add doc --- src/common/providers/https.ts | 12 +++++++----- src/function-builder.ts | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index dfece442a..491cfd58f 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -196,6 +196,7 @@ export interface TaskRateLimits { maxDispatchesPerSecond?: number; } +/** Metadata about a call to a Task Queue function. */ export interface TaskContext { /** * The result of decoding and verifying an ODIC token. @@ -610,13 +611,14 @@ export function unsafeDecodeAppCheckToken(token: string): DecodedAppCheckToken { * @param {CallableContext} ctx - Context to be sent to callable function handler. * @return {CallableTokenStatus} Status of the token verifications. */ +/** @internal */ async function checkTokens( req: Request, ctx: CallableContext ): Promise { const verifications: CallableTokenStatus = { - auth: 'INVALID', app: 'INVALID', + auth: 'INVALID', }; await Promise.all([ @@ -655,7 +657,7 @@ async function checkTokens( return verifications; } -/** @hidden */ +/** @interanl */ async function checkAuthToken( req: Request, ctx: CallableContext | TaskContext @@ -688,7 +690,7 @@ async function checkAuthToken( } } -/** @hidden */ +/** @internal */ async function checkAppCheckToken( req: Request, ctx: CallableContext @@ -728,13 +730,13 @@ type v2CallableHandler = (request: CallableRequest) => Res; type v1TaskHandler = (data: any, context: TaskContext) => void | Promise; type v2TaskHandler = (request: TaskRequest) => void | Promise; -/** @hidden **/ +/** @internal **/ export interface CallableOptions { cors: cors.CorsOptions; allowInvalidAppCheckToken?: boolean; } -/** @hidden */ +/** @internal */ export function onCallHandler( options: CallableOptions, handler: v1CallableHandler | v2CallableHandler diff --git a/src/function-builder.ts b/src/function-builder.ts index 9eaad45f8..f8a6b19d2 100644 --- a/src/function-builder.ts +++ b/src/function-builder.ts @@ -360,6 +360,7 @@ export class FunctionBuilder { * Declares a task queue function for clients to call using a Firebase Admin SDK. * @param handler A method that takes a data and context and returns void or Promise. */ + /** @hidden */ taskQueue: (options?: https.TaskQueueOptions) => { return new https.TaskQueueBuilder(options, this.options); }, From 11e03a8777f8f5de7af0bc44d6bac8b808ea8019 Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Thu, 11 Nov 2021 09:14:01 -0800 Subject: [PATCH 3/3] PR feedback --- src/common/providers/https.ts | 3 +-- src/providers/https.ts | 4 +++- src/v2/providers/https.ts | 8 +++++++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/common/providers/https.ts b/src/common/providers/https.ts index 491cfd58f..0340bbce4 100644 --- a/src/common/providers/https.ts +++ b/src/common/providers/https.ts @@ -171,7 +171,7 @@ export interface CallableRequest { /** How a task should be retried in the event of a non-2xx return. */ export interface TaskRetryConfig { - // If left unspecified, will default to 5 + // If left unspecified, will default to 3 maxAttempts?: number; // If left unspecified will default to 1hr @@ -848,7 +848,6 @@ export function onEnqueueHandler( await (handler as any)(arg); } - // If there was some result, encode it in the body. res.status(204).end(); } catch (err) { if (!(err instanceof HttpsError)) { diff --git a/src/providers/https.ts b/src/providers/https.ts index 27fdb3cde..7039e37a0 100644 --- a/src/providers/https.ts +++ b/src/providers/https.ts @@ -92,12 +92,14 @@ export interface TaskQueueOptions { invoker?: 'private' | string | string[]; } +/** @hidden */ export interface TaskQueueFunction { (req: Request, res: express.Response): Promise; __trigger: unknown; run(data: any, context: TaskContext): void | Promise; } +/** @hidden */ export class TaskQueueBuilder { /** @internal */ constructor( @@ -108,7 +110,7 @@ export class TaskQueueBuilder { onEnqueue( handler: (data: any, context: TaskContext) => void | Promise ): TaskQueueFunction { - // onCallHandler sniffs the function length of the passed-in callback + // onEnqueueHandler sniffs the function length of the passed-in callback // and the user could have only tried to listen to data. Wrap their handler // in another handler to avoid accidentally triggering the v2 API const fixedLen = (data: any, context: TaskContext) => diff --git a/src/v2/providers/https.ts b/src/v2/providers/https.ts index 843889b59..6f6725e63 100644 --- a/src/v2/providers/https.ts +++ b/src/v2/providers/https.ts @@ -62,6 +62,12 @@ export interface HttpsOptions extends Omit { export interface TaskQueueOptions extends options.GlobalOptions { retryConfig?: TaskRetryConfig; rateLimits?: TaskRateLimits; + /** + * Who can enqueue tasks for this function. + * If left unspecified, only service accounts which have + * roles/cloudtasks.enqueuer and roles/cloudfunctions.invoker + * will have permissions. + */ invoker?: 'private' | string | string[]; } @@ -247,7 +253,7 @@ export function onTaskEnqueue( opts = optsOrHandler as TaskQueueOptions; } - // wrapTaskHandler sniffs the function length to determine which API to present. + // onEnqueueHandler sniffs the function length to determine which API to present. // fix the length to prevent api versions from being mismatched. const fixedLen = (req: TaskRequest) => handler(req); const func: any = onEnqueueHandler(fixedLen);