Skip to content

Commit

Permalink
feat: add callback for invocation end with streaming functions (#245)
Browse files Browse the repository at this point in the history
* feat: add callback for invocation end with streaming functions

* add docs

* call onInvocationEnd also if streaming is unused

* ensure invocationend isn't called early for streams

* remove unused symbol

* fix last failing test
  • Loading branch information
Skn0tt committed Apr 13, 2024
1 parent 95fb3df commit 5ad5685
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Executes a lambda given the `options` object, which is a dictionary where the ke
| `envdestroy`|optional, destroy added environment on closing, default to false|
| `verboseLevel`|optional, default 3. Level 2 dismiss handler() text, level 1 dismiss lambda-local text and level 0 dismiss also the result.|
| `callback`|optional, lambda third parameter [callback][1]. When left out a Promise is returned|
| `onInvocationEnd`|optional. called once the invocation ended. useful when awslambda.streamifyResponse is used to distinguish between end of response stream and end of invocation. |
| `clientContext`|optional, used to populated clientContext property of lambda second parameter (context)

#### `lambdaLocal.setLogger(logger)`
Expand Down
4 changes: 3 additions & 1 deletion src/lambdalocal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ function _executeSync(opts) {
timeoutMs = opts.timeoutMs || 3000,
verboseLevel = opts.verboseLevel,
callback = opts.callback,
onInvocationEnd = opts.onInvocationEnd,
clientContext = null;

if (opts.clientContext) {
Expand Down Expand Up @@ -295,7 +296,8 @@ function _executeSync(opts) {
});
}
},
clientContext: clientContext
clientContext: clientContext,
onInvocationEnd: onInvocationEnd,
});

if(callback) context.callback = callback;
Expand Down
16 changes: 15 additions & 1 deletion src/lib/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import utils = require('./utils.js');
import mute = require('./mute.js');
import { StreamingBody } from './streaming.js';

function Context() {
this.logger = null;
Expand Down Expand Up @@ -39,6 +40,7 @@ function Context() {
this.logStreamName = 'Stream name';
this.identity = null;
this.clientContext = null;
this.onInvocationEnd = null;

/*
* callback function called after done
Expand Down Expand Up @@ -112,6 +114,7 @@ Context.prototype._initialize = function(options) {
this.unmute = mute();
}
this.clientContext = options.clientContext;
this.onInvocationEnd = options.onInvocationEnd;

return;
};
Expand Down Expand Up @@ -149,7 +152,12 @@ Context.prototype.generate_context = function(){
logStreamName: this.logStreamName,
identity: this.identity,
clientContext: this.clientContext,
_stopped: false
_stopped: false,

// INTERNAL
__lambdaLocal: {
onInvocationEnd: this.onInvocationEnd
},
};
return ctx;
}
Expand Down Expand Up @@ -207,6 +215,12 @@ Context.prototype.done = function(err, message) {
}
}
this.finalCallback(); //Destroy env...

const isStream = typeof message === "object" && message?.body instanceof StreamingBody
if (!isStream) {
this.onInvocationEnd?.();
}

/*
The finalCallback method will be instantly called if 'this.callbackWaitsForEmptyEventLoop' is False
Otherwise, lambda-local will wait for an empty loop then call it.
Expand Down
4 changes: 3 additions & 1 deletion src/lib/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ function streamifyResponse(handler) {
if (!body.headersSent) {
body.sendHeader(metadata)
}
context.__lambdaLocal.onInvocationEnd?.();
} catch (error) {
reject(error);
context.__lambdaLocal.onInvocationEnd?.(error);
}
});
}

class StreamingBody extends PassThrough {
export class StreamingBody extends PassThrough {
constructor(private readonly resolve: (metadata) => void) {
super();
}
Expand Down
2 changes: 2 additions & 0 deletions test/functs/test-func-streaming.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ exports.handler = awslambda.streamifyResponse(
responseStream.write("bar");
responseStream.end();
}, 100);

await new Promise(resolve => setTimeout(resolve, 200));
}
);

24 changes: 22 additions & 2 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,16 @@ describe("- Testing lambdalocal.js", function () {
assert.equal(data.result, "testvar");
});
});
it('should call onInvocationEnd', function () {
var lambdalocal = require(lambdalocal_path);
lambdalocal.setLogger(winston);
let invocationEnded = 0
opts.onInvocationEnd = () => invocationEnded++
return lambdalocal.execute(opts).then(function (data) {
assert.equal(data.result, "testvar");
assert.equal(invocationEnded, 1)
});
});
it('should be stateless', function () {
var lambdalocal = require(lambdalocal_path);
lambdalocal.setLogger(winston);
Expand Down Expand Up @@ -425,13 +435,17 @@ describe("- Testing lambdalocal.js", function () {
it('should return a readable stream as `body`', function () {
var lambdalocal = require(lambdalocal_path);
lambdalocal.setLogger(winston);
let invocationEnded = 0
return lambdalocal.execute({
event: require(path.join(__dirname, "./events/test-event.js")),
lambdaPath: path.join(__dirname, "./functs/test-func-streaming.js"),
lambdaHandler: functionName,
callbackWaitsForEmptyEventLoop: false,
timeoutMs: timeoutMs,
verboseLevel: 1
verboseLevel: 1,
onInvocationEnd() {
invocationEnded++
}
}).then(function (data) {
assert.deepEqual(
data.headers,
Expand All @@ -448,7 +462,13 @@ describe("- Testing lambdalocal.js", function () {
data.body.on("end", () => {
assert.deepEqual(chunks, ["foo", "bar"])
assert.closeTo(times[1] - times[0], 100, 50)
resolve()

assert.equal(invocationEnded, 0)

setTimeout(() => {
assert.equal(invocationEnded, 1)
resolve()
}, 200)
});
})
})
Expand Down

0 comments on commit 5ad5685

Please sign in to comment.