Skip to content

Commit

Permalink
add S3 related events for monitoring purposes #38
Browse files Browse the repository at this point in the history
  • Loading branch information
regevbr committed May 25, 2019
1 parent ae8b37e commit 1daac30
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ build/Release
node_modules/
.idea/
dist/
build/
jspm_packages/

# Typescript v1 declaration files
Expand Down
10 changes: 9 additions & 1 deletion src/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ interface IMessageEvents {
deleted: string;
autoExtendFail: AWSError;
autoExtendError: AWSError;
s3Download: IS3Upload;
s3Delete: IS3Upload;
}

type MessageEmitter = StrictEventEmitter<EventEmitter, IMessageEvents>;
Expand Down Expand Up @@ -99,9 +101,15 @@ export class Message extends (EventEmitter as new() => MessageEmitter) {
const s3 = this._s3Retriever();
promise = getBlob(s3, uploadData)
.then((resolvedBody) => {
this.emit('s3Download', uploadData);
this._squiss.emit('s3Download', {data: uploadData, message: this});
if (!this._s3Retain) {
this._deleteCallback = () => {
return deleteBlob(s3, uploadData);
return deleteBlob(s3, uploadData)
.then(() => {
this.emit('s3Delete', uploadData);
this._squiss.emit('s3Delete', {data: uploadData, message: this});
});
};
}
return Promise.resolve(resolvedBody);
Expand Down
11 changes: 10 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {createMessageAttributes, IMessageAttributes} from './attributeUtils';
import {isString} from 'ts-type-guards';
import {SQS, S3} from 'aws-sdk';
import {GZIP_MARKER, compressMessage} from './gzipUtils';
import {S3_MARKER, uploadBlob} from './s3Utils';
import {IS3Upload, S3_MARKER, uploadBlob} from './s3Utils';
import {getMessageSize} from './messageSizeUtils';
import {BatchResultErrorEntry} from 'aws-sdk/clients/sqs';
import {AWSError} from 'aws-sdk';
Expand Down Expand Up @@ -129,6 +129,11 @@ export interface IMessageDeleteErrorEventPayload {
error: BatchResultErrorEntry;
}

export interface IMessageS3EventPayload {
message: Message;
data: IS3Upload;
}

interface ISquissEvents {
delQueued: Message;
handled: Message;
Expand All @@ -148,6 +153,9 @@ interface ISquissEvents {
delError: IMessageDeleteErrorEventPayload;
autoExtendFail: IMessageErrorEventPayload;
autoExtendError: IMessageErrorEventPayload;
s3Download: IMessageS3EventPayload;
s3Delete: IMessageS3EventPayload;
s3Upload: IS3Upload;
}

type SquissEmitter = StrictEventEmitter<EventEmitter, ISquissEvents>;
Expand Down Expand Up @@ -755,6 +763,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
}
return uploadBlob(this.getS3(), this._opts.s3Bucket!, finalMessage, this._opts.s3Prefix || '')
.then((uploadData) => {
this.emit('s3Upload', uploadData);
params.MessageBody = JSON.stringify(uploadData);
params.MessageAttributes = params.MessageAttributes || {};
params.MessageAttributes[S3_MARKER] = {
Expand Down
6 changes: 6 additions & 0 deletions test/src/Message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ describe('Message', () => {
toDel.should.equal(msg);
done();
},
emit: (event: string, data: any) => {

},
} as any as Squiss,
msg: rawMsg,
bodyFormat: 'json',
Expand Down Expand Up @@ -294,6 +297,9 @@ describe('Message', () => {
toDel.should.equal(msg);
done();
},
emit: (event: string, data: any) => {

},
} as any as Squiss,
msg: rawMsg,
bodyFormat: 'json',
Expand Down

0 comments on commit 1daac30

Please sign in to comment.