Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add jetstream pull subscribe wrappers #480

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 70 additions & 0 deletions components/channel/jetStreamPullSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { realizeChannelName, camelCase, getMessageType, messageHasNullPayload, realizeParametersForChannelWrapper, renderJSDocParameters} from '../../utils/index';
import { unwrap } from './ChannelParameterUnwrap';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';

/**
* Component which returns a function which subscribes to the given channel
*
* @param {string} defaultContentType
* @param {string} channelName to subscribe to
* @param {Message} message which is being received
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPullSubscription(channelName, message, channelParameters) {
const messageType = getMessageType(message);
let parameters = [];
parameters = Object.entries(channelParameters).map(([parameterName]) => {
return `${camelCase(parameterName)}Param`;
});
const hasNullPayload = messageHasNullPayload(message.payload());

//Determine the callback process when receiving messages.
//If the message payload is null no hooks are called to process the received data.
let whenReceivingMessage = `onDataCallback(undefined, null ${parameters.length > 0 && `, ${parameters.join(',')}`});`;
if (!hasNullPayload) {
whenReceivingMessage = `
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, ${messageType}.unmarshal(receivedData) ${parameters.length > 0 && `, ${parameters.join(',')}`});
`;
}

return `
/**
* Internal functionality to setup jetstream pull subscription on the \`${channelName}\` channel
*
* @param onDataCallback to call when messages are received
* @param nc to subscribe with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
*/
export function jetStreamPullSubscribe(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg?: ${messageType}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any >
${realizeParametersForChannelWrapper(channelParameters)},
options: Nats.ConsumerOptsBuilder | Partial<Nats.ConsumerOpts>
): Promise < Nats.JetStreamPullSubscription > {
return new Promise(async (resolve, reject) => {
try {
const subscription = await js.pullSubscribe(${realizeChannelName(channelParameters, channelName)}, options);

(async () => {
for await (const msg of subscription) {
${unwrap(channelName, channelParameters)}

${whenReceivingMessage}
}
})();
resolve(subscription);
} catch (e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
`;
}
55 changes: 55 additions & 0 deletions components/index/jetStreamPullSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { pascalCase, camelCase, getMessageType, realizeParametersForChannelWrapper, realizeParametersForChannelWithoutType, renderJSDocParameters} from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';

/**
* Component which returns a subscribe to function for the client
*
* @param {string} defaultContentType
* @param {string} channelName to publish to
* @param {Message} message which is being received
* @param {string} messageDescription
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPullSubscribe(channelName, message, messageDescription, channelParameters) {
return `
/**
* Push subscription to the \`${channelName}\`
*
* ${messageDescription}
*
* @param onDataCallback to call when messages are received
${renderJSDocParameters(channelParameters)}
* @param flush ensure client is force flushed after subscribing
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPullSubscribeTo${pascalCase(channelName)}(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg?: ${getMessageType(message)}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void
${realizeParametersForChannelWrapper(channelParameters)},
options: Nats.ConsumerOptsBuilder | Partial<Nats.ConsumerOpts>
): Promise<Nats.JetStreamPullSubscription> {
return new Promise(async (resolve, reject) => {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
try {
const sub = ${camelCase(channelName)}Channel.jetStreamPullSubscribe(
onDataCallback,
this.js,
this.codec
${Object.keys(channelParameters).length ? ` ,${realizeParametersForChannelWithoutType(channelParameters)}` : ''},
options
);
resolve(sub);
} catch (e: any) {
reject(e);
}
} else {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED));
}
});
}
`;
}
17 changes: 17 additions & 0 deletions examples/simple-publish/asyncapi-nats-client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ The test/mirror client which is the reverse to the normal NatsAsyncApiClient.
* [.subscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+subscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPullStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiTestClient+jetStreamPullStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon)

<a name="NatsAsyncApiTestClient+connect"></a>

Expand Down Expand Up @@ -313,3 +314,19 @@ Channel for the turn on command which should turn on the streetlight
| flush | ensure client is force flushed after subscribing |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiTestClient+jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon"></a>

### natsAsyncApiTestClient.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)
Push subscription to the `streetlight/{streetlight_id}/command/turnon`

Channel for the turn on command which should turn on the streetlight

**Kind**: instance method of [<code>NatsAsyncApiTestClient</code>](#NatsAsyncApiTestClient)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| streetlight_id | parameter to use in topic |
| flush | ensure client is force flushed after subscribing |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,39 @@ export class NatsAsyncApiTestClient {
}
});
}
/**
* Push subscription to the `streetlight/{streetlight_id}/command/turnon`
*
* Channel for the turn on command which should turn on the streetlight
*
* @param onDataCallback to call when messages are received
* @param streetlight_id parameter to use in topic
* @param flush ensure client is force flushed after subscribing
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string,
options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts >
): Promise < Nats.JetStreamPullSubscription > {
return new Promise(async (resolve, reject) => {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
try {
const sub = streetlightStreetlightIdCommandTurnonChannel.jetStreamPullSubscribe(
onDataCallback,
this.js,
this.codec, streetlight_id,
options
);
resolve(sub);
} catch (e: any) {
reject(e);
}
} else {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,46 @@ export function jetStreamPushSubscribe(
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
/**
* Internal functionality to setup jetstream pull subscription on the `streetlight/{streetlight_id}/command/turnon` channel
*
* @param onDataCallback to call when messages are received
* @param nc to subscribe with
* @param codec used to convert messages
* @param streetlight_id parameter to use in topic
*/
export function jetStreamPullSubscribe(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any > , streetlight_id: string,
options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts >
): Promise < Nats.JetStreamPullSubscription > {
return new Promise(async (resolve, reject) => {
try {
const subscription = await js.pullSubscribe(`streetlight.${streetlight_id}.command.turnon`, options);
(async () => {
for await (const msg of subscription) {
const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`;
let channel = msg.subject;
const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}");
const splits = [
streetlightIdSplit[0],
streetlightIdSplit[1]
];
channel = channel.substring(splits[0].length);
const streetlightIdEnd = channel.indexOf(splits[1]);
const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd);
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam);
}
})();
resolve(subscription);
} catch (e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
32 changes: 32 additions & 0 deletions examples/simple-subscribe/asyncapi-nats-client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Module which wraps functionality for the `streetlight/{streetlight_id}/command/t
* [~subscribe(onDataCallback, nc, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..subscribe)
* [~jetStreamPull(onDataCallback, js, codec, streetlight_id)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPull)
* [~jetStreamPushSubscribe(onDataCallback, nc, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPushSubscribe)
* [~jetStreamPullSubscribe(onDataCallback, nc, codec, streetlight_id)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPullSubscribe)

<a name="module_streetlightStreetlightIdCommandTurnon..subscribe"></a>

Expand Down Expand Up @@ -74,6 +75,20 @@ Internal functionality to setup jetstream push subscription on the `streetlight/
| streetlight_id | parameter to use in topic |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

<a name="module_streetlightStreetlightIdCommandTurnon..jetStreamPullSubscribe"></a>

### streetlightStreetlightIdCommandTurnon~jetStreamPullSubscribe(onDataCallback, nc, codec, streetlight_id)
Internal functionality to setup jetstream pull subscription on the `streetlight/{streetlight_id}/command/turnon` channel

**Kind**: inner method of [<code>streetlightStreetlightIdCommandTurnon</code>](#module_streetlightStreetlightIdCommandTurnon)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| nc | to subscribe with |
| codec | used to convert messages |
| streetlight_id | parameter to use in topic |

<a name="NatsAsyncApiClient"></a>

## NatsAsyncApiClient
Expand All @@ -94,6 +109,7 @@ The generated client based on your AsyncAPI document.
* [.subscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+subscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPullStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiClient+jetStreamPullStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon)

<a name="NatsAsyncApiClient+connect"></a>

Expand Down Expand Up @@ -211,6 +227,22 @@ Channel for the turn on command which should turn on the streetlight
| flush | ensure client is force flushed after subscribing |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiClient+jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon"></a>

### natsAsyncApiClient.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)
Push subscription to the `streetlight/{streetlight_id}/command/turnon`

Channel for the turn on command which should turn on the streetlight

**Kind**: instance method of [<code>NatsAsyncApiClient</code>](#NatsAsyncApiClient)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| streetlight_id | parameter to use in topic |
| flush | ensure client is force flushed after subscribing |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiTestClient"></a>

## NatsAsyncApiTestClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,46 @@ export function jetStreamPushSubscribe(
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
/**
* Internal functionality to setup jetstream pull subscription on the `streetlight/{streetlight_id}/command/turnon` channel
*
* @param onDataCallback to call when messages are received
* @param nc to subscribe with
* @param codec used to convert messages
* @param streetlight_id parameter to use in topic
*/
export function jetStreamPullSubscribe(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any > , streetlight_id: string,
options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts >
): Promise < Nats.JetStreamPullSubscription > {
return new Promise(async (resolve, reject) => {
try {
const subscription = await js.pullSubscribe(`streetlight.${streetlight_id}.command.turnon`, options);
(async () => {
for await (const msg of subscription) {
const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`;
let channel = msg.subject;
const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}");
const splits = [
streetlightIdSplit[0],
streetlightIdSplit[1]
];
channel = channel.substring(splits[0].length);
const streetlightIdEnd = channel.indexOf(splits[1]);
const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd);
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam);
}
})();
resolve(subscription);
} catch (e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
35 changes: 35 additions & 0 deletions examples/simple-subscribe/asyncapi-nats-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,39 @@ export class NatsAsyncApiClient {
}
});
}
/**
* Push subscription to the `streetlight/{streetlight_id}/command/turnon`
*
* Channel for the turn on command which should turn on the streetlight
*
* @param onDataCallback to call when messages are received
* @param streetlight_id parameter to use in topic
* @param flush ensure client is force flushed after subscribing
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string,
options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts >
): Promise < Nats.JetStreamPullSubscription > {
return new Promise(async (resolve, reject) => {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
try {
const sub = streetlightStreetlightIdCommandTurnonChannel.jetStreamPullSubscribe(
onDataCallback,
this.js,
this.codec, streetlight_id,
options
);
resolve(sub);
} catch (e: any) {
reject(e);
}
} else {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED));
}
});
}
}