Skip to content

Commit

Permalink
Scheduled functions via pubsub (#227)
Browse files Browse the repository at this point in the history
* Scheduler

* unit tests

* Adds integration tests for scheduled functions

* removing uninented commit

* fixing version number

* adding labels and test for labels

* trigger tests off of call to cloud scheduler:run

* adding region/runtime opts tests

* Predictions Functions (#220)

* Firebase Predictions Integration with Functions - Initial check in

* Add predictions getter

* Make API match what was in the review, and add unit tests.

* Fix imports.

* Add error handling.

* Handle process.env.GCLOUD_PROJECT correctly.

* Fix region.

* Upper case RiskToleranceName.

* Add changelog message.

* formatting

* cleaning up

* Revert "Predictions Functions (#220)" (#226)

This reverts commit 838597aa6df973462a50484ec1f50d625da3f633.

* pr fixes

* adding clarifying comments

* starting on pubsub impl

* adds pusbub.schedule()

* switching tests over from https to pubsub

* Change timezone to a method instead of an optional argument

* use real eventType

* remove extra slash

* changelog

* switch signature to onRun(context)

* switches changelog to present tense
  • Loading branch information
joehan committed Apr 15, 2019
1 parent d7fd3d1 commit a683252
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 16 deletions.
4 changes: 1 addition & 3 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
feature - Adds functions.app() api to access the app instance used by functions
fixed - improved types of the `Change` class to describe both `before` and `after` fields as non-optional
fixed - Improve type of express.Request to include rawBody
feature - Adds pubsub.schedule()
36 changes: 35 additions & 1 deletion integration_test/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,33 @@ function callHttpsTrigger(name: string, data: any, baseUrl) {
});
}

function callScheduleTrigger(functionName: string, region: string) {
return new Promise((resolve, reject) => {
const request = https.request(
{
method: 'POST',
host: 'cloudscheduler.googleapis.com',
path: `projects/${
firebaseConfig.projectId
}/locations/us-central1/jobs/firebase-schedule-${functionName}-${region}:run`,
headers: {
'Content-Type': 'application/json',
},
},
response => {
let body = '';
response.on('data', chunk => {
body += chunk;
});
response.on('end', () => resolve(body));
}
);
request.on('error', reject);
request.write('{}');
request.end();
});
}

export const integrationTests: any = functions
.runWith({
timeoutSeconds: 540,
Expand All @@ -62,6 +89,10 @@ export const integrationTests: any = functions
.database()
.ref()
.push().key;
admin
.database()
.ref(`testRuns/${testId}/timestamp`)
.set(Date.now());
console.log('testId is: ', testId);
fs.writeFile('/tmp/' + testId + '.txt', 'test', () => {});
return Promise.all([
Expand Down Expand Up @@ -120,6 +151,9 @@ export const integrationTests: any = functions
.bucket()
.upload('/tmp/' + testId + '.txt'),
// Invoke a callable HTTPS trigger.
callHttpsTrigger('callableTests', { foo: 'bar', testId }),
// Invoke the schedule for our scheduled function to fire
callScheduleTrigger('schedule', 'us-central1'),
])
.then(() => {
// On test completion, check that all tests pass and reply "PASS", or provide further details.
Expand All @@ -129,7 +163,7 @@ export const integrationTests: any = functions
let testsExecuted = 0;
ref.on('child_added', snapshot => {
testsExecuted += 1;
if (!snapshot.val().passed) {
if (snapshot.key != 'timestamp' && !snapshot.val().passed) {
reject(
new Error(
`test ${snapshot.key} failed; see database for details.`
Expand Down
24 changes: 23 additions & 1 deletion integration_test/functions/src/pubsub-tests.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as functions from 'firebase-functions';
import { TestSuite, expectEq, evaluate } from './testing';
import * as admin from 'firebase-admin';
import { TestSuite, expectEq, evaluate, success } from './testing';
import PubsubMessage = functions.pubsub.Message;

// TODO(inlined) use multiple queues to run inline.
Expand Down Expand Up @@ -57,3 +58,24 @@ export const pubsubTests: any = functions.pubsub

.run(testId, m, c);
});

export const schedule: any = functions.pubsub
.schedule('every 10 hours') // This is a dummy schedule, since we need to put a valid one in.
// For the test, the job is triggered by the jobs:run api
.onRun(context => {
let testId;
let db = admin.database();
return new Promise(async (resolve, reject) => {
await db
.ref('testRuns')
.orderByChild('timestamp')
.limitToLast(1)
.on('value', snap => {
testId = Object.keys(snap.val())[0];
new TestSuite('pubsub scheduleOnRun')
.it('should trigger when the scheduler fires', () => success())
.run(testId, null);
});
resolve();
});
});
2 changes: 1 addition & 1 deletion integration_test/functions/src/testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class TestSuite<T> {
}
}

function success() {
export function success() {
return Promise.resolve().then(() => true);
}

Expand Down
6 changes: 3 additions & 3 deletions spec/providers/https.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import { expect } from 'chai';
import * as express from 'express';
import * as firebase from 'firebase-admin';
import * as https from '../../src/providers/https';
import * as jwt from 'jsonwebtoken';
import * as mocks from '../fixtures/credential/key.json';
import * as nock from 'nock';
import * as _ from 'lodash';
import * as nock from 'nock';
import { apps as appsNamespace } from '../../src/apps';
import * as functions from '../../src/index';
import * as https from '../../src/providers/https';
import * as mocks from '../fixtures/credential/key.json';

describe('CloudHttpsBuilder', () => {
describe('#onRequest', () => {
Expand Down
152 changes: 152 additions & 0 deletions spec/providers/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,158 @@ describe('Pubsub Functions', () => {
});
});
});
describe('#schedule', () => {
it('should return a trigger with schedule', () => {
let result = pubsub.schedule('every 5 minutes').onRun(context => null);
expect(result.__trigger.schedule).to.deep.equal({
schedule: 'every 5 minutes',
});
});
it('should return a trigger with schedule and timeZone when one is chosen', () => {
let result = pubsub
.schedule('every 5 minutes')
.timeZone('America/New_York')
.onRun(context => null);
expect(result.__trigger.schedule).to.deep.equal({
schedule: 'every 5 minutes',
timeZone: 'America/New_York',
});
});
it('should return a trigger with schedule and retry config when called with retryConfig', () => {
let retryConfig = {
retryCount: 3,
maxRetryDuration: '10 minutes',
minBackoffDuration: '10 minutes',
maxBackoffDuration: '10 minutes',
maxDoublings: 5,
};
let result = pubsub
.schedule('every 5 minutes')
.retryConfig(retryConfig)
.onRun(() => null);
expect(result.__trigger.schedule).to.deep.equal({
schedule: 'every 5 minutes',
retryConfig: retryConfig,
});
expect(result.__trigger.labels).to.deep.equal({
'deployment-scheduled': 'true',
});
});
it('should return a trigger with schedule, timeZone and retry config when called with retryConfig and timeout', () => {
let retryConfig = {
retryCount: 3,
maxRetryDuration: '10 minutes',
minBackoffDuration: '10 minutes',
maxBackoffDuration: '10 minutes',
maxDoublings: 5,
};
let result = pubsub
.schedule('every 5 minutes')
.timeZone('America/New_York')
.retryConfig(retryConfig)
.onRun(() => null);
expect(result.__trigger.schedule).to.deep.equal({
schedule: 'every 5 minutes',
retryConfig: retryConfig,
timeZone: 'America/New_York',
});
expect(result.__trigger.labels).to.deep.equal({
'deployment-scheduled': 'true',
});
});
it('should return an appropriate trigger when called with region and options', () => {
let result = functions
.region('us-east1')
.runWith({
timeoutSeconds: 90,
memory: '256MB',
})
.pubsub.schedule('every 5 minutes')
.onRun(() => null);
expect(result.__trigger.schedule).to.deep.equal({
schedule: 'every 5 minutes',
});
expect(result.__trigger.regions).to.deep.equal(['us-east1']);
expect(result.__trigger.availableMemoryMb).to.deep.equal(256);
expect(result.__trigger.timeout).to.deep.equal('90s');
});
it('should return an appropriate trigger when called with region, timeZone, and options', () => {
let result = functions
.region('us-east1')
.runWith({
timeoutSeconds: 90,
memory: '256MB',
})
.pubsub.schedule('every 5 minutes')
.timeZone('America/New_York')
.onRun(() => null);
expect(result.__trigger.schedule).to.deep.equal({
schedule: 'every 5 minutes',
timeZone: 'America/New_York',
});
expect(result.__trigger.regions).to.deep.equal(['us-east1']);
expect(result.__trigger.availableMemoryMb).to.deep.equal(256);
expect(result.__trigger.timeout).to.deep.equal('90s');
});
it('should return an appropriate trigger when called with region, options and retryConfig', () => {
let retryConfig = {
retryCount: 3,
maxRetryDuration: '10 minutes',
minBackoffDuration: '10 minutes',
maxBackoffDuration: '10 minutes',
maxDoublings: 5,
};
let result = functions
.region('us-east1')
.runWith({
timeoutSeconds: 90,
memory: '256MB',
})
.pubsub.schedule('every 5 minutes')
.retryConfig(retryConfig)
.onRun(() => null);
expect(result.__trigger.schedule).to.deep.equal({
schedule: 'every 5 minutes',
retryConfig: retryConfig,
});
expect(result.__trigger.labels).to.deep.equal({
'deployment-scheduled': 'true',
});
expect(result.__trigger.regions).to.deep.equal(['us-east1']);
expect(result.__trigger.availableMemoryMb).to.deep.equal(256);
expect(result.__trigger.timeout).to.deep.equal('90s');
});
it('should return an appropriate trigger when called with region, options, retryConfig, and timeZone', () => {
let retryConfig = {
retryCount: 3,
maxRetryDuration: '10 minutes',
minBackoffDuration: '10 minutes',
maxBackoffDuration: '10 minutes',
maxDoublings: 5,
};
let result = functions
.region('us-east1')
.runWith({
timeoutSeconds: 90,
memory: '256MB',
})
.pubsub.schedule('every 5 minutes')
.timeZone('America/New_York')
.retryConfig(retryConfig)
.onRun(() => null);
expect(result.__trigger.schedule).to.deep.equal({
schedule: 'every 5 minutes',
timeZone: 'America/New_York',
retryConfig: retryConfig,
});
expect(result.__trigger.labels).to.deep.equal({
'deployment-scheduled': 'true',
});
expect(result.__trigger.regions).to.deep.equal(['us-east1']);
expect(result.__trigger.availableMemoryMb).to.deep.equal(256);
expect(result.__trigger.timeout).to.deep.equal('90s');
});
});
});

describe('handler namespace', () => {
Expand Down
44 changes: 37 additions & 7 deletions src/cloud-functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

import { apps } from './apps';
import * as _ from 'lodash';
import { Request, Response } from 'express';
import * as _ from 'lodash';
import { apps } from './apps';
import { DeploymentOptions } from './function-builder';
export { Request, Response };

Expand Down Expand Up @@ -177,9 +177,24 @@ export interface TriggerAnnotated {
regions?: string[];
timeout?: string;
availableMemoryMb?: number;
schedule?: Schedule;
};
}

export interface ScheduleRetryConfig {
retryCount?: number;
maxRetryDuration?: string;
minBackoffDuration?: string;
maxBackoffDuration?: string;
maxDoublings?: number;
}

export interface Schedule {
schedule: string;
timeZone?: string;
retryConfig?: ScheduleRetryConfig;
}

/** A Runnable has a `run` method which directly invokes the user-defined function - useful for unit testing. */
export interface Runnable<T> {
run: (data: T, context: any) => PromiseLike<any> | any;
Expand Down Expand Up @@ -209,11 +224,13 @@ export interface MakeCloudFunctionArgs<EventData> {
triggerResource: () => string;
service: string;
dataConstructor?: (raw: Event) => EventData;
handler: (data: EventData, context: EventContext) => PromiseLike<any> | any;
handler?: (data: EventData, context: EventContext) => PromiseLike<any> | any;
contextOnlyHandler?: (context: EventContext) => PromiseLike<any> | any;
before?: (raw: Event) => void;
after?: (raw: Event) => void;
legacyEventType?: string;
opts?: { [key: string]: any };
labels?: { [key: string]: any };
}

/** @internal */
Expand All @@ -224,6 +241,7 @@ export function makeCloudFunction<EventData>({
service,
dataConstructor = (raw: Event) => raw.data,
handler,
contextOnlyHandler,
before = () => {
return;
},
Expand All @@ -232,6 +250,7 @@ export function makeCloudFunction<EventData>({
},
legacyEventType,
opts = {},
labels = {},
}: MakeCloudFunctionArgs<EventData>): CloudFunction<EventData> {
let cloudFunction;

Expand Down Expand Up @@ -273,8 +292,14 @@ export function makeCloudFunction<EventData>({

before(event);

let dataOrChange = dataConstructor(event);
let promise = handler(dataOrChange, context);
let promise;
if (labels && labels['deployment-scheduled']) {
// Scheduled function do not have meaningful data, so exclude it
promise = contextOnlyHandler(context);
} else {
const dataOrChange = dataConstructor(event);
promise = handler(dataOrChange, context);
}
if (typeof promise === 'undefined') {
console.warn('Function returned undefined, expected Promise or value');
}
Expand Down Expand Up @@ -320,12 +345,14 @@ export function makeCloudFunction<EventData>({
service,
},
});

if (!_.isEmpty(labels)) {
trigger.labels = labels;
}
return trigger;
},
});

cloudFunction.run = handler;
cloudFunction.run = handler || contextOnlyHandler;
return cloudFunction;
}

Expand Down Expand Up @@ -398,5 +425,8 @@ export function optsToTrigger(opts: DeploymentOptions) {
};
trigger.availableMemoryMb = _.get(memoryLookup, opts.memory);
}
if (opts.schedule) {
trigger.schedule = opts.schedule;
}
return trigger;
}
Loading

0 comments on commit a683252

Please sign in to comment.