-
Notifications
You must be signed in to change notification settings - Fork 106
/
middlewares.js
221 lines (194 loc) · 9.17 KB
/
middlewares.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
const axios = require('axios')
const { sendResponse, errorResponse, errorResponseUnauthorized, errorResponseServerError } = require('./apiHelpers')
const config = require('./config')
const sessionManager = require('./sessionManager')
const models = require('./models')
const utils = require('./utils')
/** Ensure valid cnodeUser and session exist for provided session token. */
async function authMiddleware (req, res, next) {
// Get session token
const sessionToken = req.get(sessionManager.sessionTokenHeader)
if (!sessionToken) {
return sendResponse(req, res, errorResponseUnauthorized('Authentication token not provided'))
}
// Ensure session exists for session token
const cnodeUserUUID = await sessionManager.verifySession(sessionToken)
if (!cnodeUserUUID) {
return sendResponse(req, res, errorResponseUnauthorized('Invalid authentication token'))
}
// Ensure cnodeUser exists for session
const cnodeUser = await models.CNodeUser.findOne({ where: { cnodeUserUUID } })
if (!cnodeUser) {
return sendResponse(req, res, errorResponseUnauthorized('No node user exists for provided authentication token'))
}
// Attach session object to request
req.session = {
cnodeUser: cnodeUser,
wallet: cnodeUser.walletPublicKey,
cnodeUserUUID: cnodeUserUUID
}
next()
}
/** Ensure resource write access */
async function syncLockMiddleware (req, res, next) {
if (req.session && req.session.wallet) {
const redisClient = req.app.get('redisClient')
const redisKey = redisClient.getNodeSyncRedisKey(req.session.wallet)
const lockHeld = await redisClient.lock.getLock(redisKey)
if (lockHeld) {
return sendResponse(req, res, errorResponse(423,
`Cannot change state of wallet ${req.session.wallet}. Node sync currently in progress.`
))
}
}
next()
}
/** Blocks writes if node is not the primary for audiusUser associated with wallet. */
async function ensurePrimaryMiddleware (req, res, next) {
if (config.get('isUserMetadataNode')) next()
const start = Date.now()
if (!req.session || !req.session.wallet) {
return sendResponse(req, res, errorResponseUnauthorized('User must be logged in'))
}
let serviceEndpoint
try {
serviceEndpoint = await getOwnEndpoint(req)
} catch (e) {
return sendResponse(req, res, errorResponseServerError(e))
}
let creatorNodeEndpoints
try {
creatorNodeEndpoints = await getCreatorNodeEndpoints(req, req.session.wallet)
} catch (e) {
return sendResponse(req, res, errorResponseServerError(e))
}
const primary = creatorNodeEndpoints[0]
// Error if this node is not primary for user.
if (!primary || (primary && serviceEndpoint !== primary)) {
return sendResponse(
req,
res,
errorResponseUnauthorized(`This node (${serviceEndpoint}) is not primary for user. Primary is: ${primary}`)
)
}
req.session.nodeIsPrimary = true
req.session.creatorNodeEndpoints = creatorNodeEndpoints
console.log(`ensurePrimaryMiddleware route time ${Date.now() - start}`)
next()
}
/**
* Tell all secondaries to sync against self.
* @dev - Is not a middleware so it can be run before responding to client.
*/
async function triggerSecondarySyncs (req) {
if (config.get('isUserMetadataNode') || config.get('snapbackDevModeEnabled')) return
try {
if (!req.session.nodeIsPrimary || !req.session.creatorNodeEndpoints || !Array.isArray(req.session.creatorNodeEndpoints)) return
const [primary, ...secondaries] = req.session.creatorNodeEndpoints
await Promise.all(secondaries.map(async secondary => {
if (!secondary || !_isFQDN(secondary)) return
const axiosReq = {
baseURL: secondary,
url: '/sync',
method: 'post',
data: {
wallet: [req.session.wallet],
creator_node_endpoint: primary,
immediate: false
}
}
return axios(axiosReq)
}))
} catch (e) {
req.logger.error(`Trigger secondary syncs ${req.session.wallet}`, e.message)
}
}
/** Retrieves current FQDN registered on-chain with node's owner wallet. */
// TODO - this can all be cached on startup, but we can't validate the spId on startup unless the
// services has been registered, and we can't register the service unless the service starts up.
// Bit of a chicken and egg problem here with timing of first time setup, but potential optimization here
async function getOwnEndpoint (req) {
if (config.get('isUserMetadataNode')) throw new Error('Not available for userMetadataNode')
const libs = req.app.get('audiusLibs')
let creatorNodeEndpoint = config.get('creatorNodeEndpoint')
if (!creatorNodeEndpoint) throw new Error('Must provide either creatorNodeEndpoint config var.')
const spId = await libs.ethContracts.ServiceProviderFactoryClient.getServiceProviderIdFromEndpoint(creatorNodeEndpoint)
if (!spId) throw new Error('Cannot get spId for node')
const spInfo = await libs.ethContracts.ServiceProviderFactoryClient.getServiceEndpointInfo('creator-node', spId)
// Confirm on-chain endpoint exists and is valid FQDN
// Error condition is met if any of the following are true
// - No spInfo returned from chain
// - Configured spOwnerWallet does not match on chain spOwnerWallet
// - Configured delegateOwnerWallet does not match on chain delegateOwnerWallet
// - Endpoint returned from chain but is an invalid FQDN, preventing successful operations
// - Endpoint returned from chain does not match configured endpoint
if (!spInfo ||
!spInfo.hasOwnProperty('endpoint') ||
(spInfo.owner.toLowerCase() !== config.get('spOwnerWallet').toLowerCase()) ||
(spInfo.delegateOwnerWallet.toLowerCase() !== config.get('delegateOwnerWallet').toLowerCase()) ||
(spInfo['endpoint'] && !_isFQDN(spInfo['endpoint'])) ||
(spInfo['endpoint'] !== creatorNodeEndpoint)
) {
throw new Error(`Cannot getOwnEndpoint for node. Returned from chain=${JSON.stringify(spInfo)}, configs=(creatorNodeEndpoint=${config.get('creatorNodeEndpoint')}, spOwnerWallet=${config.get('spOwnerWallet')}, delegateOwnerWallet=${config.get('delegateOwnerWallet')})`)
}
return spInfo['endpoint']
}
/** Get all creator node endpoints for user by wallet from discprov. */
async function getCreatorNodeEndpoints (req, wallet) {
if (config.get('isUserMetadataNode')) throw new Error('Not available for userMetadataNode')
const libs = req.app.get('audiusLibs')
req.logger.info(`Starting getCreatorNodeEndpoints for wallet ${wallet}`)
const start = Date.now()
// Poll discprov until it has indexed provided blocknumber to ensure up-to-date user data.
let user = null
const { blockNumber } = req.body
if (blockNumber) {
let discprovBlockNumber = -1
const start2 = Date.now()
// In total, will try for 200 seconds.
const MaxRetries = 40
const RetryTimeout = 5000 // 5 seconds
await utils.timeout(1000)
for (let retry = 1; retry <= MaxRetries; retry++) {
req.logger.info(`getCreatorNodeEndpoints retry #${retry}/${MaxRetries} || time from start: ${Date.now() - start2} discprovBlockNumber ${discprovBlockNumber} || blockNumber ${blockNumber}`)
try {
const fetchedUser = await libs.User.getUsers(1, 0, null, wallet)
if (!fetchedUser || fetchedUser.length === 0 || !fetchedUser[0].hasOwnProperty('blocknumber') || !fetchedUser[0].hasOwnProperty('track_blocknumber')) {
throw new Error('Missing or malformatted user fetched from discprov.')
}
user = fetchedUser
discprovBlockNumber = Math.max(user[0].blocknumber, user[0].track_blocknumber)
if (discprovBlockNumber >= blockNumber) {
break
}
} catch (e) { // Ignore all errors until MaxRetries exceeded.
req.logger.info(e)
}
await utils.timeout(RetryTimeout)
req.logger.info(`getCreatorNodeEndpoints AFTER TIMEOUT retry #${retry}/${MaxRetries} || time from start: ${Date.now() - start2} discprovBlockNumber ${discprovBlockNumber} || blockNumber ${blockNumber}`)
}
if (discprovBlockNumber < blockNumber) {
throw new Error(`Discprov still outdated after ${MaxRetries}. Discprov blocknumber ${discprovBlockNumber} requested blocknumber ${blockNumber}`)
}
if (!user) {
throw new Error(`Failed to retrieve user from discprov after ${MaxRetries} retries. Aborting.`)
}
} else {
req.logger.info(`getCreatorNodeEndpoints || no blockNumber passed, fetching user without retries.`)
user = await libs.User.getUsers(1, 0, null, wallet)
}
if (!user || user.length === 0 || !user[0].hasOwnProperty('creator_node_endpoint')) {
throw new Error(`Invalid return data from discovery provider for user with wallet ${wallet}.`)
}
const endpoint = user[0]['creator_node_endpoint']
const resp = endpoint ? endpoint.split(',') : []
req.logger.info(`getCreatorNodeEndpoints route time ${Date.now() - start}`)
return resp
}
// Regular expression to check if endpoint is a FQDN. https://regex101.com/r/kIowvx/2
function _isFQDN (url) {
if (config.get('creatorNodeIsDebug')) return true
const FQDN = new RegExp(/(?:^|[ \t])((https?:\/\/)?(?:localhost|[\w-]+(?:\.[\w-]+)+)(:\d+)?(\/\S*)?)/gm)
return FQDN.test(url)
}
module.exports = { authMiddleware, ensurePrimaryMiddleware, triggerSecondarySyncs, syncLockMiddleware, getOwnEndpoint, getCreatorNodeEndpoints }