Skip to content

Commit

Permalink
Add callback to modify message before it is getting sent
Browse files Browse the repository at this point in the history
fixes #470
  • Loading branch information
cdupuis committed Feb 11, 2019
1 parent 2e64ec4 commit e01dc8d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 36 deletions.
82 changes: 46 additions & 36 deletions lib/internal/transport/AbstractRequestProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ export abstract class AbstractRequestProcessor implements RequestProcessor {

public processCommand(command: CommandIncoming,
// tslint:disable-next-line:no-empty
callback: (result: Promise<HandlerResult>) => void = () => { }) {
callback: (result: Promise<HandlerResult>) => void = () => {
}) {
// setup context
const ses = namespace.create();
const cls = this.setupNamespace(command, this.automations);
Expand Down Expand Up @@ -93,8 +94,9 @@ export abstract class AbstractRequestProcessor implements RequestProcessor {
}

public processEvent(event: EventIncoming,
// tslint:disable-next-line:no-empty
callback: (results: Promise<HandlerResult[]>) => void = () => { }) {
// tslint:disable-next-line:no-empty
callback: (results: Promise<HandlerResult[]>) => void = () => {
}) {
// setup context
const ses = namespace.create();
const cls = this.setupNamespace(event, this.automations);
Expand Down Expand Up @@ -154,7 +156,7 @@ export abstract class AbstractRequestProcessor implements RequestProcessor {
status: {
code,
reason: `${success ? "Successfully" : "Unsuccessfully"} invoked command` +
` ${request.command} of ${this.automations.automations.name}@${this.automations.automations.version}`,
` ${request.command} of ${this.automations.automations.name}@${this.automations.automations.version}`,
},
};
return this.sendStatusMessage(response, ctx);
Expand All @@ -175,7 +177,7 @@ export abstract class AbstractRequestProcessor implements RequestProcessor {
status: {
code: success ? 0 : 1,
reason: `${success ? "Successfully" : "Unsuccessfully"} invoked event subscription` +
` ${request.extensions.operationName} of ${this.automations.automations.name}@${this.automations.automations.version}`,
` ${request.extensions.operationName} of ${this.automations.automations.name}@${this.automations.automations.version}`,
},
};
return this.sendStatusMessage(response, ctx);
Expand Down Expand Up @@ -397,45 +399,53 @@ class AutomationEventListenerEnabledMessageClient implements MessageClient {
private listeners: AutomationEventListener[] = []) {
}

public respond(msg: any,
options?: MessageOptions): Promise<any> {
public async respond(msg: any,
options?: MessageOptions): Promise<any> {
const newMsg = await this.listeners.map(
l => m => l.messageSending(m, [], options, this.ctx))
.reduce((p, f) => p.then(f), Promise.resolve(msg));

eventStore().recordMessage(
options && options.id ? options.id : guid(),
this.ctx.correlationId,
msg);
return this.delegate.respond(msg, options)
.then(() => {
return Promise.all(
this.listeners.map(
l => l.messageSent(
msg,
[],
options,
this.ctx),
),
);
});
newMsg);

await this.delegate.respond(newMsg, options);

return Promise.all(
this.listeners.map(
l => l.messageSent(
newMsg,
[],
options,
this.ctx),
),
);
}

public send(msg: any,
destinations: Destination | Destination[],
options?: MessageOptions): Promise<any> {
public async send(msg: any,
destinations: Destination | Destination[],
options?: MessageOptions): Promise<any> {
const newMsg = await this.listeners.map(
l => m => l.messageSending(m, destinations, options, this.ctx))
.reduce((p, f) => p.then(f), Promise.resolve(msg));

eventStore().recordMessage(
options && options.id ? options.id : guid(),
this.ctx.correlationId,
msg);
return this.delegate.send(msg, destinations, options)
.then(() => {
return Promise.all(
this.listeners.map(
l => l.messageSent(
msg,
destinations,
options,
this.ctx),
),
);
});
newMsg);

await this.delegate.send(newMsg, destinations, options);

return Promise.all(
this.listeners.map(
l => l.messageSent(
newMsg,
destinations,
options,
this.ctx),
),
);
}
}

Expand Down
19 changes: 19 additions & 0 deletions lib/server/AutomationEventListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ export interface AutomationEventListener {
*/
eventFailed(payload: EventFired<any>, ctx: HandlerContext, err: any): Promise<void>;

/**
* Message will be sent via the MessageClient. Return a potentially modified message to be sent.
* @param message
* @param destinations
* @param options
* @param ctx
*/
messageSending(message: any,
destinations: Destination | Destination[],
options: MessageOptions,
ctx: HandlerContext): Promise<any>;

/**
* Message was sent via the MessageClient
* @param message
Expand Down Expand Up @@ -157,6 +169,13 @@ export class AutomationEventListenerSupport implements AutomationEventListener {
return Promise.resolve();
}

public messageSending(message: any,
destinations: Destination | Destination[],
options: MessageOptions,
ctx: HandlerContext): Promise<any> {
return Promise.resolve(message);
}

public messageSent(message: any,
destinations: Destination | Destination[],
options: MessageOptions,
Expand Down

0 comments on commit e01dc8d

Please sign in to comment.