-
-
Notifications
You must be signed in to change notification settings - Fork 11
/
reply.js
106 lines (96 loc) · 4.66 KB
/
reply.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import { unwrap } from './ChannelParameterUnwrap';
import { realizeChannelName, camelCase, includeUnsubAfterForSubscription, messageHasNullPayload, getMessageType, realizeParametersForChannelWrapper, includeQueueForSubscription, shouldPromisifyCallbacks, renderJSDocParameters } from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';
/**
* @typedef TemplateParameters
* @type {object}
* @property {boolean|string} generateTestClient - whether or not test client should be generated.
* @property {boolean|string} promisifyReplyCallback - whether or not reply callbacks should be promisify.
*/
/**
* Component which returns a function which sets up a reply for a given channel
*
* @param {string} channelName to reply to
* @param {Message} replyMessage used to reply to request
* @param {Message} receiveMessage which is received by the request
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
* @param {TemplateParameters} params template parameters
*/
export function Reply(channelName, replyMessage, receiveMessage, channelParameters, params, operation) {
const replyMessageType = getMessageType(replyMessage);
const receiveMessageType = getMessageType(receiveMessage);
//Create an array of all the parameter names
let parameters = [];
parameters = Object.entries(channelParameters).map(([parameterName, _]) => {
return `${camelCase(parameterName)}Param`;
});
const requestHasNullPayload = messageHasNullPayload(receiveMessage.payload());
const replyHasNullPayload = messageHasNullPayload(replyMessage.payload());
//Determine the receiving process based on whether the payload type is null
let receivingOperation = `let message = ${shouldPromisifyCallbacks(params) ? 'await' : ''} onRequest(undefined, null ${parameters.length > 0 ? `, ${parameters.join(',')}` : ''});`;
if (!requestHasNullPayload) {
receivingOperation = `
let receivedData : any = codec.decode(msg.data);
let replyMessage = ${shouldPromisifyCallbacks(params) ? 'await' : ''} onRequest(undefined, ${receiveMessageType}.unmarshal(receivedData) ${parameters.length > 0 ? `, ${parameters.join(',')}` : ''});
`;
}
//Determine the reply process based on whether the payload type is null
let replyOperation = 'msg.respond(Nats.Empty);';
if (!replyHasNullPayload) {
replyOperation = `
let dataToSend : any = replyMessage.marshal();
dataToSend = codec.encode(dataToSend);
msg.respond(dataToSend);
`;
}
return `
/**
* Internal functionality to setup reply to the \`${channelName}\` channel
*
* @param onRequest called when request is received
* @param onReplyError called when it was not possible to send the reply
* @param client to setup reply with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
export function reply(
onRequest : (
err?: NatsTypescriptTemplateError,
msg?: ${getMessageType(receiveMessage)}
${realizeParametersForChannelWrapper(channelParameters, false)}
) => ${shouldPromisifyCallbacks(params) ? 'Promise<' : ''}${replyMessageType}${ shouldPromisifyCallbacks(params) ? '>' : ''},
onReplyError: (err: NatsTypescriptTemplateError) => void,
nc: Nats.NatsConnection,
codec: Nats.Codec<any>
${realizeParametersForChannelWrapper(channelParameters)},
options?: Nats.SubscriptionOptions
): Promise<Nats.Subscription> {
return new Promise(async (resolve, reject) => {
try {
let subscribeOptions: Nats.SubscriptionOptions = {... options};
${includeQueueForSubscription(operation)}
${includeUnsubAfterForSubscription(operation)}
let subscription = nc.subscribe(${realizeChannelName(channelParameters, channelName)}, subscribeOptions);
(async () => {
for await (const msg of subscription) {
${unwrap(channelName, channelParameters)}
${receivingOperation}
if (msg.reply) {
${replyOperation}
} else {
let error = new NatsTypescriptTemplateError('Expected request to need a reply, did not..', '000');
onReplyError(error)
return;
}
}
})();
resolve(subscription);
} catch (e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
`;
}