/
subscription.ts
121 lines (104 loc) · 3.38 KB
/
subscription.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import { resolve } from 'path';
import {
Subscribers,
SubscriberV1,
SubscriberV2,
SubscriberObject,
SubscriberTuple,
SubscriberOptions,
} from '../subscriber';
import SubscriberLoader from './subscriberLoader';
import { ResourceResolver } from './resourceResolver';
import { Logger } from './logger';
export default class SubscriptionService {
public static subscribers: (
| typeof SubscriberV1
| typeof SubscriberV2
| SubscriberObject
)[] = [];
private static _subscribers: Subscribers = [];
public static defaultSubscriberOptions: SubscriberOptions;
public static instance = new SubscriptionService();
public constructor() {
this.checkExistence(process.env, 'PUBSUB_ROOT_DIR');
}
protected checkExistence(object: any, property: string): void {
if (
!object.hasOwnProperty(property) ||
(object.hasOwnProperty(property) && object[property] == '')
) {
Logger.Instance.warn(
`@honestfoodcompany/pubsub module requires ${property} to be defined in your .env`,
);
}
}
public static async init(): Promise<void> {
//
}
/**
* If passed, it would serve as the default error handler at SubscriptionService level
* Applications should override this with custom error handling
*/
public static handleError(error: Error): void {
// default error handling logic
Logger.Instance.error({ error }, 'Received Unexpected Error');
// To keep backwards compatibility with no error handler
throw error;
}
/**
* Call this function from a process exit handler to close all current subscriptions
*/
public static async closeAll(): Promise<void> {
//
}
public static getSubscribers(): Subscribers {
if (SubscriptionService._subscribers.length > 0) {
return SubscriptionService._subscribers as Subscribers;
}
SubscriptionService.loadSubscribers();
return SubscriptionService._subscribers as Subscribers;
}
private static loadSubscribers(): Subscribers {
const [subscriptionService, pubsubSubscriptionsDir] =
ResourceResolver.getFiles();
const subscriptionServiceClass =
SubscriptionService.loadSubscriptionService();
const loader = new SubscriberLoader();
SubscriptionService._subscribers = this.mergeSubscribers(
loader.loadSubscribersFromService(
subscriptionService,
subscriptionServiceClass.defaultSubscriberOptions,
),
loader.loadSubscribersFromDirectory(
pubsubSubscriptionsDir,
subscriptionServiceClass.defaultSubscriberOptions,
),
);
return SubscriptionService._subscribers;
}
private static mergeSubscribers(
subscribersFromService: Subscribers,
subscribersFromDirectory: Subscribers,
): Subscribers {
return Array.from(
subscribersFromService
.concat(subscribersFromDirectory)
.reduce((map, subscriber) => {
const subscriptionKey =
subscriber[1].topicName + subscriber[1].subscriptionName;
map.set(subscriptionKey, subscriber);
return map;
}, new Map<string, SubscriberTuple>())
.values(),
);
}
public static loadSubscriptionService(): typeof SubscriptionService {
const [subscriptionService] = ResourceResolver.getFiles();
try {
const service = require(resolve(subscriptionService)).default;
return service;
} catch (e) {
return SubscriptionService;
}
}
}