Skip to content

Commit c5287c7

Browse files
committed
fix: timeout error despite topic publishing messages
1 parent 027997f commit c5287c7

8 files changed

+134
-35
lines changed

src/pubsub-publisher/pubsub-publisher.module.spec.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ describe('PubSubPublisherModule', () => {
3838
const publishJsonMock = jest
3939
.spyOn(topic, 'publishMessage')
4040
.mockResolvedValue(mockedRes as never);
41-
const clearTimeoutMock = jest.spyOn(global, 'clearTimeout');
4241

4342
const data = {
4443
a: 1,
@@ -51,7 +50,6 @@ describe('PubSubPublisherModule', () => {
5150

5251
expect(res).toBe(mockedRes[0]);
5352
expect(publishJsonMock).toBeCalledWith({ data: Buffer.from(JSON.stringify(data)), attributes });
54-
expect(clearTimeoutMock).toBeCalled();
5553
});
5654

5755
it('publishes to topic raw buffer', async () => {
@@ -63,7 +61,6 @@ describe('PubSubPublisherModule', () => {
6361
const publishJsonMock = jest
6462
.spyOn(topic, 'publishMessage')
6563
.mockResolvedValue(mockedRes as never);
66-
const clearTimeoutMock = jest.spyOn(global, 'clearTimeout');
6764

6865
const data = Buffer.from('hello world');
6966
const attributes = {
@@ -74,7 +71,6 @@ describe('PubSubPublisherModule', () => {
7471

7572
expect(res).toBe(mockedRes[0]);
7673
expect(publishJsonMock).toBeCalledWith({ data, attributes });
77-
expect(clearTimeoutMock).toBeCalled();
7874
});
7975

8076
it('rejects with PubSubTimeoutError if publishing hangs', async () => {

src/pubsub-publisher/pubsub-publisher.service.ts

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,36 +37,11 @@ export class PubSubPublisherService {
3737
data: Buffer,
3838
attributes: Record<string, string>
3939
): Promise<string> {
40-
const timeout = setTimeoutAsync(this.settings.requestTimeoutMillis);
41-
42-
const [result] = await Promise.race([
43-
topic.publishMessage({ data, attributes }),
44-
timeout.then(() => {
45-
throw new PubSubTimeoutError();
46-
}),
47-
]);
48-
49-
timeout.clear();
50-
40+
const [result] = await topic.publishMessage({ data, attributes });
5141
return result;
5242
}
5343
}
5444

55-
function setTimeoutAsync(millis: number) {
56-
let timeoutResolve: () => void;
57-
let timeout: NodeJS.Timeout;
58-
const promise = new Promise<void>((resolve) => {
59-
timeoutResolve = resolve;
60-
timeout = setTimeout(() => resolve(), millis);
61-
});
62-
return Object.assign(promise, {
63-
clear: () => {
64-
clearTimeout(timeout);
65-
timeoutResolve();
66-
},
67-
});
68-
}
69-
7045
export class PubSubTimeoutError extends InternalServerErrorException {
7146
constructor() {
7247
super('Request timeout');

src/pubsub/pubsub.module.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { PubSubService } from './pubsub.service';
66
import { Token } from '../utils/token';
77
import { AsyncSettings, SettingsProvider } from '../utils/settings-provider';
88
import { credentials } from '@grpc/grpc-js';
9+
import { PubSubPublisherSettings } from '../pubsub-publisher/pubsub-publisher.module';
910

1011
export const PubSubSettings = Symbol('PubSubSettings');
1112
export type PubSubSettings = PubSubInstanceOrSettings & {
@@ -78,6 +79,10 @@ export class PubSubModule {
7879
module: PubSubModule,
7980
global: true,
8081
providers: [
82+
<SettingsProvider<PubSubPublisherSettings>>{
83+
provide: PubSubPublisherSettings,
84+
...settings,
85+
},
8186
<SettingsProvider<PubSubSettings>>{
8287
provide: PubSubSettings,
8388
...settings,
@@ -89,9 +94,12 @@ export class PubSubModule {
8994
},
9095
{
9196
provide: PubSubService,
92-
inject: [PubSub, PubSubSettings],
93-
useFactory: (pubSub: PubSub, settings: PubSubSettings) =>
94-
new PubSubService(pubSub, settings),
97+
inject: [PubSub, PubSubSettings, PubSubPublisherSettings],
98+
useFactory: (
99+
pubSub: PubSub,
100+
settings: PubSubSettings,
101+
timeoutMillis: PubSubPublisherSettings
102+
) => new PubSubService(pubSub, settings, timeoutMillis),
95103
},
96104
],
97105
exports: [PubSubService],

src/pubsub/pubsub.service.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ import { Topic, Subscription, PubSub } from '@google-cloud/pubsub';
22
import { Injectable, OnModuleDestroy } from '@nestjs/common';
33
import { PubSubSettings, SubscriptionSettings, TopicSettings } from './pubsub.module';
44
import { Token } from '../utils/token';
5+
import { defaultRetryOptions } from '../utils/default-retry-options';
6+
import { PubSubPublisherSettings } from '../pubsub-publisher/pubsub-publisher.module';
7+
import { mergeObjects } from '../utils/deep-merge-object';
58

69
@Injectable()
710
export class PubSubService implements OnModuleDestroy {
@@ -11,7 +14,11 @@ export class PubSubService implements OnModuleDestroy {
1114
private openTopics: Record<Token, Topic> = {};
1215
private openSubscriptions: Record<Token, Subscription> = {};
1316

14-
constructor(public readonly pubSub: PubSub, private settings: PubSubSettings) {
17+
constructor(
18+
public readonly pubSub: PubSub,
19+
private settings: PubSubSettings,
20+
private timeoutMillis: PubSubPublisherSettings
21+
) {
1522
this.topicsSettings = settings.topics;
1623
this.subscriptionsSettings = createSubscriptionsStore(settings.topics);
1724
}
@@ -23,7 +30,11 @@ export class PubSubService implements OnModuleDestroy {
2330
throw new Error(`Cannot find topic by alias: ${String(token)}`);
2431
}
2532

26-
const { name, options } = topicSettings;
33+
const { name, options: userOptions } = topicSettings;
34+
const options = mergeObjects(
35+
defaultRetryOptions(this.timeoutMillis.requestTimeoutMillis),
36+
userOptions as Record<string, unknown>
37+
);
2738
return this.pubSub.topic(name, options);
2839
});
2940
}

src/utils/deep-merge-object.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
export const mergeObjects = (
2+
obj1: Record<string, unknown>,
3+
obj2: Record<string, unknown> = {}
4+
): Record<string, unknown> => {
5+
// create a new object that will be the merged version of obj1 and obj2
6+
const mergedObj: Record<string, unknown> = {};
7+
8+
// loop through all the keys in obj1
9+
for (const key of Object.keys(obj1)) {
10+
// if the key is not present in obj2, or if the value at that key is not an object,
11+
// add the key-value pair to the merged object
12+
if (!obj2.hasOwnProperty(key) || typeof obj1[key] !== 'object') {
13+
mergedObj[key] = obj1[key];
14+
} else {
15+
// if the value at the key in obj1 is an object and the same key exists in obj2,
16+
// merge the objects and add the merged object to the mergedObj
17+
mergedObj[key] = mergeObjects(
18+
obj1[key] as Record<string, unknown>,
19+
obj2[key] as Record<string, unknown>
20+
);
21+
}
22+
}
23+
24+
// loop through all the keys in obj2
25+
for (const key of Object.keys(obj2)) {
26+
// if the key is not present in obj1 or if the value at that key is not an object,
27+
// add the key-value pair to the merged object
28+
if (!obj1.hasOwnProperty(key) || typeof obj2[key] !== 'object') {
29+
mergedObj[key] = obj2[key];
30+
} else {
31+
// if the value at the key in obj2 is an object and the same key exists in obj1,
32+
// merge the objects and add the merged object to the mergedObj
33+
mergedObj[key] = mergeObjects(
34+
obj1[key] as Record<string, unknown>,
35+
obj2[key] as Record<string, unknown>
36+
);
37+
}
38+
}
39+
40+
return mergedObj;
41+
};
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { mergeObjects } from './deep-merge-object';
2+
3+
describe('deep merge two objects', () => {
4+
it('merges two objects with nested values', () => {
5+
const retryDefaults = {
6+
prop1: 'prop should stay',
7+
retry: {
8+
backoffSettings: {
9+
prop2: 'prop should stay',
10+
timeoutInMillis: 1,
11+
},
12+
},
13+
};
14+
const options = {
15+
prop3: 'prop should stay',
16+
retry: {
17+
backoffSettings: {
18+
timeoutInMillis: 100,
19+
prop4: 'prop should stay',
20+
},
21+
},
22+
};
23+
expect(mergeObjects(retryDefaults, options)).toEqual({
24+
prop1: 'prop should stay',
25+
prop3: 'prop should stay',
26+
retry: {
27+
backoffSettings: {
28+
timeoutInMillis: 100,
29+
prop2: 'prop should stay',
30+
prop4: 'prop should stay',
31+
},
32+
},
33+
});
34+
});
35+
});
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { defaultRetryOptions } from './default-retry-options';
2+
3+
describe('get default retry options', () => {
4+
it('returns default retry options with passed timeout in milliseconds', () => {
5+
const timeoutMillis = 100;
6+
expect(defaultRetryOptions(timeoutMillis)).toEqual({
7+
gaxOpts: {
8+
retry: {
9+
backoffSettings: {
10+
initialRetryDelayMillis: 1,
11+
retryDelayMultiplier: 1.3,
12+
maxRetryDelayMillis: 100,
13+
totalTimeoutMillis: timeoutMillis,
14+
},
15+
},
16+
},
17+
});
18+
});
19+
});

src/utils/default-retry-options.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
export type DefaultRetryOptions = Record<string, unknown>;
2+
3+
export const defaultRetryOptions = (timeoutMillis: number): DefaultRetryOptions => ({
4+
gaxOpts: {
5+
retry: {
6+
backoffSettings: {
7+
initialRetryDelayMillis: 1,
8+
retryDelayMultiplier: 1.3,
9+
maxRetryDelayMillis: 100,
10+
totalTimeoutMillis: timeoutMillis,
11+
},
12+
},
13+
},
14+
});

0 commit comments

Comments
 (0)