-
-
Notifications
You must be signed in to change notification settings - Fork 62
/
adapter.js
142 lines (120 loc) · 3.71 KB
/
adapter.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import EventEmitter from 'events'
import uriTemplates from 'uri-templates'
import GleeConnection from './connection.js'
class GleeAdapter extends EventEmitter {
/**
* Instantiates a Glee adapter.
*
* @param {Glee} glee A reference to the Glee app.
* @param {String} serverName The name of the AsyncAPI server to use for the connection.
* @param {AsyncAPIServer} server The AsyncAPI server to use for the connection.
* @param {AsyncAPIDocument} parsedAsyncAPI The AsyncAPI document.
*/
constructor (glee, serverName, server, parsedAsyncAPI) {
super()
this.glee = glee
this.serverName = serverName
this.AsyncAPIServer = server
this.parsedAsyncAPI = parsedAsyncAPI
this.channelNames = this.parsedAsyncAPI.channelNames()
this.connections = []
const uriTemplateValues = {}
process.env.GLEE_SERVER_VARIABLES?.split(',').forEach(t => {
const [localServerName, variable, value] = t.split(':')
if (localServerName === this.serverName) uriTemplateValues[variable] = value
})
this.serverUrlExpanded = uriTemplates(this.AsyncAPIServer.url()).fill(uriTemplateValues)
this.on('error', err => { this.glee.injectError(err) })
this.on('message', (message, connection) => {
const conn = new GleeConnection({
connection,
channels: this.connections.find(c => c.rawConnection === connection).channels,
serverName,
server,
parsedAsyncAPI,
})
this.glee.injectMessage(message, serverName, conn)
})
function enrichEvent(ev) {
return {
...ev,
...{
serverName,
server,
}
}
}
function createConnection(ev) {
let channels = ev.channels
if (!channels && ev.channel) channels = [ev.channel]
return new GleeConnection({
connection: ev.connection,
channels,
serverName,
server,
parsedAsyncAPI,
})
}
this.on('connect', (ev) => {
const conn = createConnection(ev)
this.connections.push(conn)
this.glee.emit('adapter:connect', enrichEvent({
connection: conn,
}))
})
this.on('server:ready', (ev) => {
this.glee.emit('adapter:server:ready', enrichEvent(ev))
})
this.on('server:connection:open', (ev) => {
const conn = createConnection(ev)
this.connections.push(conn)
this.glee.emit('adapter:server:connection:open', enrichEvent({
connection: conn,
}))
})
this.on('reconnect', (ev) => {
const conn = createConnection(ev)
this.glee.emit('adapter:reconnect', enrichEvent({
connection: conn,
}))
})
this.on('close', (ev) => {
const conn = createConnection(ev)
this.glee.emit('adapter:close', enrichEvent({
connection: conn,
}))
})
}
/**
* Returns a list of the channels a given adapter has to subscribe to.
*
* @return {Promise}
*/
getSubscribedChannels() {
return this.channelNames
.filter(channelName => {
const channel = this.parsedAsyncAPI.channel(channelName)
if (!channel.hasPublish()) return false
const channelServers = channel.publish().ext('x-servers') || this.parsedAsyncAPI.serverNames()
return channelServers.includes(this.serverName)
})
}
/**
* Connects to the remote server.
*
* @return {Promise}
*/
async connect () {
throw new Error('Method `connect` is not implemented.')
}
/**
* Sends a message to the remote server.
*
* @param {GleeMessage} message The message to send.
* @return {Promise}
*/
async send (message) {
throw new Error('Method `send` is not implemented.')
}
}
export default GleeAdapter