Skip to content

Commit

Permalink
Merge pull request #29 from AckeeCZ/feat/pubsub-ordering
Browse files Browse the repository at this point in the history
✨ Add support for PubSub ordering key
  • Loading branch information
vlasy committed May 10, 2021
2 parents 92cecec + b70dfcf commit 0e1ad0c
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/lib/adapters/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ export const fuQuPubSub: FuQuCreator<FuQuPubSubOptions, Message> = (pubSub: PubS
name: 'pubsub',
isAlive: () => subscription.then(s => s.isOpen),
close: () => subscription.then(s => s.close()),
publishJson: async (payload, attributes) => {
await (await topic).publishJSON(payload, attributes);
publishJson: async (payload, attributes, publishOptions) => {
await (await topic).publishMessage({ attributes, json: payload, orderingKey: publishOptions?.orderingKey })
},
registerHandler: async handler => {
const sub = await subscription;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/fuqu.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ describe('Fuqu', () => {
expect(res.handlerParams).toMatchInlineSnapshot(
'"[{ foo: number; }, { v: string; }, { data: string; meta: Record<string, string>; }]"'
);
expect(res.publishParams).toMatchInlineSnapshot('"[{ foo: number; }, ({ v: string; } | undefined)?]"');
expect(res.publishParams).toMatchInlineSnapshot('"[{ foo: number; }, ({ v: string; } | undefined)?, ({ [key: string]: any; } | undefined)?]"');
});
});
2 changes: 1 addition & 1 deletion src/lib/fuqu.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export interface FuQu<P, A, M> {
* @param payload Object-like message payload
* @param attributes Optional message attributes
*/
publish: (payload: P, attributes?: A) => Promise<void>;
publish: (payload: P, attributes?: A, options?: {[key: string]: any}) => Promise<void>;
/**
* Subscribe a message receiver
* @param handler Async handler that is given data and original message.
Expand Down
6 changes: 3 additions & 3 deletions src/lib/fuquAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export type FuQuCreator<O extends FuQuOptions<any, any>, Message> = <

export interface FuQuAdapter<P, A, M> {
name: string;
publishJson: (payload: P, attributes?: A) => Promise<void>;
publishJson: (payload: P, attributes?: A, options?: {[key: string]: any}) => Promise<void>;
registerHandler: (handler: Handler<P, A, M>) => Promise<void> | void;
ack: (message: M) => Promise<void> | void;
nack: (message: M) => Promise<void> | void;
Expand Down Expand Up @@ -39,8 +39,8 @@ export const createFuQu = <P, A, M>(
};
log({ options, action: 'create' });
return {
publish: async (payload, attributes) => {
await adapter.publishJson(payload, attributes);
publish: async (payload, attributes, publishOptions) => {
await adapter.publishJson(payload, attributes, publishOptions);
log({ payload, attributes, action: 'publish' });
},
subscribe: async handler => {
Expand Down

0 comments on commit 0e1ad0c

Please sign in to comment.