Skip to content

Commit

Permalink
[ASI-614] Bull queue to expire user session tokens (#1905)
Browse files Browse the repository at this point in the history
* [ASI-614] Bull queue to expire user session tokens

Prevent replay attacks by expiring user session tokens with a daily cron
and Bull queue in content node.

See also:

[ASI-614][1]

[1]: https://linear.app/audius/issue/ASI-614

* [ASI-614] Batch redis tx for latency, retry DB tx

* Use Redis MULTI feature to batch handle transactions in
SessionExpirationQueue
* Implement retry for DB bulk session delete call in SessionManager before rollback

See also:

[ASI-614][1]

[1]: https://linear.app/audius/issue/ASI-614
i

* [ASI-614] Add test scaffolding

See also:

[ASI-614][1]

[1]: https://linear.app/audius/issue/ASI-614

* [ASI-614] Test coverage for session expiration

* add tests for SessionManager
* add tests for DBManager

See also:

[ASI-614][1]

[1]: https://linear.app/audius/issue/ASI-614

* [ASI-614] Clean up logs and documentation for session token expiration queue

* Fix documentation for dbManager
  • Loading branch information
csjiang committed Nov 15, 2021
1 parent de17704 commit c538f27
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 20 deletions.
73 changes: 57 additions & 16 deletions creator-node/src/dbManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ const sequelize = models.sequelize

class DBManager {
/**
* Given file insert query object and cnodeUserUUID, inserts new file record in DB
* Entrypoint for writes/destructive DB operations.
*
* Functionality:
* A. Given file insert query object and cnodeUserUUID, inserts new file record in DB
* and handles all required clock management.
* Steps:
* 1. increments cnodeUser clock value by 1
* 2. insert new ClockRecord entry with new clock value
* 3. insert new Data Table (File, Track, AudiusUser) entry with queryObj and new clock value
* In steps 2 and 3, clock values are read as subquery to guarantee atomicity
*
* B. Given a list of IDs, batch deletes user session tokens to expire sessions on the server-side.
*/
static async createNewDataRecord (queryObj, cnodeUserUUID, sequelizeTableInstance, transaction) {
// Increment CNodeUser.clock value by 1
Expand Down Expand Up @@ -44,15 +49,13 @@ class DBManager {
*
* @notice This method is currently unused. It's a legacy function from non-diffed sync which might be needed in the future.
*
* @dev TODO add unit test
*
* @param {*} CNodeUserLookupObj
* @param {*} sequelizeTableInstance
* @param {*} tx
* @param {Transaction} externalTransaction
*/
static async deleteAllCNodeUserDataFromDB ({ lookupCnodeUserUUID, lookupWallet }, externalTransaction) {
const transaction = (externalTransaction) || (await models.sequelize.transaction())
const log = (msg) => logger.info(`DBManager log: ${msg}`)
const log = (msg) => logger.info(`DBManager.deleteAllCNodeUserDataFromDB log: ${msg}`)

const start = Date.now()
let error
Expand All @@ -71,13 +74,14 @@ class DBManager {
}

const cnodeUserUUID = cnodeUser.cnodeUserUUID
log(`deleteAllCNodeUserDataFromDB || cnodeUserUUID: ${cnodeUserUUID} || beginning delete ops`)
const cnodeUserUUIDLog = `cnodeUserUUID: ${cnodeUserUUID}`
log(`${cnodeUserUUIDLog} || beginning delete ops`)

const numAudiusUsersDeleted = await models.AudiusUser.destroy({
where: { cnodeUserUUID },
transaction
})
log(`deleteAllCNodeUserDataFromDB || cnodeUserUUID: ${cnodeUserUUID} || numAudiusUsersDeleted ${numAudiusUsersDeleted}`)
log(`${cnodeUserUUIDLog} || numAudiusUsersDeleted ${numAudiusUsersDeleted}`)

// TrackFiles must be deleted before associated Tracks can be deleted
const numTrackFilesDeleted = await models.File.destroy({
Expand All @@ -87,36 +91,36 @@ class DBManager {
},
transaction
})
log(`deleteAllCNodeUserDataFromDB || cnodeUserUUID: ${cnodeUserUUID} || numTrackFilesDeleted ${numTrackFilesDeleted}`)
log(`${cnodeUserUUIDLog} || numTrackFilesDeleted ${numTrackFilesDeleted}`)

const numTracksDeleted = await models.Track.destroy({
where: { cnodeUserUUID },
transaction
})
log(`deleteAllCNodeUserDataFromDB || cnodeUserUUID: ${cnodeUserUUID} || numTracksDeleted ${numTracksDeleted}`)
log(`${cnodeUserUUIDLog} || numTracksDeleted ${numTracksDeleted}`)

// Delete all remaining files (image / metadata files).
const numNonTrackFilesDeleted = await models.File.destroy({
where: { cnodeUserUUID },
transaction
})
log(`deleteAllCNodeUserDataFromDB || cnodeUserUUID: ${cnodeUserUUID} || numNonTrackFilesDeleted ${numNonTrackFilesDeleted}`)
log(`${cnodeUserUUIDLog} || numNonTrackFilesDeleted ${numNonTrackFilesDeleted}`)

const numClockRecordsDeleted = await models.ClockRecord.destroy({
where: { cnodeUserUUID },
transaction
})
log(`deleteAllCNodeUserDataFromDB || cnodeUserUUID: ${cnodeUserUUID} || numClockRecordsDeleted ${numClockRecordsDeleted}`)
log(`${cnodeUserUUIDLog} || numClockRecordsDeleted ${numClockRecordsDeleted}`)

const numSessionTokensDeleted = await models.SessionToken.destroy({
where: { cnodeUserUUID },
transaction
})
log(`deleteAllCNodeUserDataFromDB || cnodeUserUUID: ${cnodeUserUUID} || numSessionTokensDeleted ${numSessionTokensDeleted}`)
log(`${cnodeUserUUIDLog} || numSessionTokensDeleted ${numSessionTokensDeleted}`)

// Delete cnodeUser entry
await cnodeUser.destroy({ transaction })
log(`deleteAllCNodeUserDataFromDB || cnodeUserUUID: ${cnodeUserUUID} || cnodeUser entry deleted`)
log(`${cnodeUserUUIDLog} || cnodeUser entry deleted`)
} catch (e) {
if (e.message !== 'No cnodeUser found') {
error = e
Expand All @@ -126,14 +130,51 @@ class DBManager {
// TODO - consider not rolling back in case of external transaction, and just throwing instead
if (error) {
await transaction.rollback()
log(`deleteAllCNodeUserDataFromDB || rolling back transaction due to error ${error}`)
log(`rolling back transaction due to error ${error}`)
} else if (!externalTransaction) {
// Commit transaction if no error and no external transaction provided
await transaction.commit()
log(`commited internal transaction`)
}

log(`completed in ${Date.now() - start}ms`)
}
}

/**
* Deletes all session tokens matching an Array of SessionTokens.
*
* @param {Array} sessionTokens from the SessionTokens table
* @param {Transaction} externalTransaction
*/
static async deleteSessionTokensFromDB (sessionTokens, externalTransaction) {
const transaction = (externalTransaction) || (await models.sequelize.transaction())
const log = (msg) => logger.info(`DBManager.deleteSessionTokensFromDB || log: ${msg}`)
const ids = sessionTokens.map(st => st.id)
const start = Date.now()
let error
try {
log(`beginning delete ops`)

const numSessionTokensDeleted = await models.SessionToken.destroy({
where: { id: ids },
transaction
})
log(`numSessionTokensDeleted ${numSessionTokensDeleted}`)
} catch (e) {
error = e
} finally {
// Rollback transaction on error
if (error) {
await transaction.rollback()
log(`rolling back transaction due to error ${error}`)
} else if (!externalTransaction) {
// Commit transaction if no error and no external transaction provided
await transaction.commit()
log(`deleteAllCNodeUserDataFromDB || commited internal transaction`)
log(`commited internal transaction`)
}

log(`deleteAllCNodeUserDataFromDB || completed in ${Date.now() - start}ms`)
log(`completed in ${Date.now() - start}ms`)
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions creator-node/src/serviceRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { logger } = require('./logging')
const utils = require('./utils')
const SyncQueue = require('./services/sync/syncQueue')
const SkippedCIDsRetryQueue = require('./services/sync/skippedCIDsRetryService')
const SessionExpirationQueue = require('./services/SessionExpirationQueue')

/**
* `ServiceRegistry` is a container responsible for exposing various
Expand All @@ -22,6 +23,7 @@ const SkippedCIDsRetryQueue = require('./services/sync/skippedCIDsRetryService')
* - `blackListManager`: responsible for handling blacklisted content
* - `libs`: an instance of Audius Libs
* - `monitoringQueue`: recurring job to monitor node state & performance metrics
* - `sessionExpirationQueue`: recurring job to clear expired session tokens from Redis and DB
* - `nodeConfig`: exposes config object
* - `snapbackSM`: SnapbackStateMachine is responsible for recurring sync and reconfig operations
* - `URSMRegistrationManager`: registers node on L2 URSM contract, no-ops afterward
Expand All @@ -36,6 +38,7 @@ class ServiceRegistry {
this.ipfsLatest = ipfsLatest
this.blacklistManager = BlacklistManager
this.monitoringQueue = new MonitoringQueue()
this.sessionExpirationQueue = new SessionExpirationQueue()

// below services are initialized separately in below functions `initServices()` and `initServicesThatRequireServer()`
this.libs = null
Expand Down Expand Up @@ -63,6 +66,7 @@ class ServiceRegistry {

// Intentionally not awaitted
this.monitoringQueue.start()
this.sessionExpirationQueue.start()

this.servicesInitialized = true
}
Expand Down
112 changes: 112 additions & 0 deletions creator-node/src/services/SessionExpirationQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
const Bull = require('bull')
const Sequelize = require('sequelize')
const sessionManager = require('../sessionManager')
const config = require('../config')
const { logger } = require('../logging')
const { SessionToken } = require('../models')

const RUN_INTERVAL = 60 * 1000 * 60 * 24 // daily run
const SESSION_EXPIRATION_AGE = 60 * 1000 * 60 * 24 * 14 // 2 weeks
const BATCH_SIZE = 100
const PROCESS_NAMES = Object.freeze({
expire_sessions: 'expire_sessions'
})

/**
* A persistent cron-style queue that periodically deletes expired session tokens from Redis cache and the database. Runs on startup, deleting 100 sessions at a time, and then runs daily to clear sessions older than 14d.
*
*/
class SessionExpirationQueue {
constructor () {
this.sessionExpirationAge = SESSION_EXPIRATION_AGE
this.batchSize = BATCH_SIZE
this.runInterval = RUN_INTERVAL
this.queue = new Bull(
'session-expiration-queue',
{
redis: {
port: config.get('redisPort'),
host: config.get('redisHost')
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true
}
}
)
this.logStatus = this.logStatus.bind(this)
this.expireSessions = this.expireSessions.bind(this)

// Clean up anything that might be still stuck in the queue on restart
this.queue.empty()

this.queue.process(
PROCESS_NAMES.expire_sessions,
/* concurrency */ 1,
async (job, done) => {
try {
this.logStatus('Starting')
let progress = 0
const SESSION_EXPIRED_CONDITION = {
where: {
created_at: {
[Sequelize.Op.gt]: new Date(Date.now() - this.sessionExpirationAge)
}
}
}
const numExpiredSessions = await SessionToken.count(SESSION_EXPIRED_CONDITION)
this.logStatus(`${numExpiredSessions} expired sessions ready for deletion.`)

let sessionsToDelete = numExpiredSessions
while (sessionsToDelete > 0) {
await this.expireSessions(SESSION_EXPIRED_CONDITION)
progress += (this.batchSize / numExpiredSessions) * 100
job.progress(progress)
sessionsToDelete -= this.batchSize
}
done(null, {})
} catch (e) {
this.logStatus(`Error ${e}`)
done(e)
}
}
)
}

async expireSessions (sessionExpiredCondition) {
const sessionsToDelete = await SessionToken.findAll(Object.assign(sessionExpiredCondition, { limit: this.batchSize }))
await sessionManager.deleteSessions(sessionsToDelete)
}

/**
* Logs a status message and includes current queue info
* @param {string} message
*/
async logStatus (message) {
const { waiting, active, completed, failed, delayed } = await this.queue.getJobCounts()
logger.info(`Session Expiration Queue: ${message} || active: ${active}, waiting: ${waiting}, failed ${failed}, delayed: ${delayed}, completed: ${completed} `)
}

/**
* Starts the session expiration queue on a daily cron.
*/
async start () {
try {
// Run the job immediately
await this.queue.add(PROCESS_NAMES.expire_sessions)

// Then enqueue the job to run on a regular interval
setInterval(async () => {
try {
await this.queue.add(PROCESS_NAMES.expire_sessions)
} catch (e) {
this.logStatus('Failed to enqueue!')
}
}, this.runInterval)
} catch (e) {
this.logStatus('Startup failed!')
}
}
}

module.exports = SessionExpirationQueue
19 changes: 19 additions & 0 deletions creator-node/src/sessionManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const randomBytes = promisify(crypto.randomBytes)

const models = require('./models')
const redisClient = require('./redis')
const DBManager = require('./dbManager')

const sessionTokenHeaderKey = 'X-Session-ID'
const sessionTokenLength = 40
Expand Down Expand Up @@ -41,6 +42,24 @@ class SessionManager {
await session.destroy()
await redisClient.del(`SESSION.${sessionToken}`)
}

static async deleteSessions (sessionTokens) {
const txCommands = sessionTokens.map(({ token }) => ['del', `SESSION.${token}`])
try {
await DBManager.deleteSessionTokensFromDB(sessionTokens)
} catch (e1) {
try {
await DBManager.deleteSessionTokensFromDB(sessionTokens)
} catch (e2) {
throw new Error(`[sessionManager]: Failure (and retry failure) when deleting expired sessions from DB: ${e1.message}\n$`)
}
}
try {
await redisClient.multi(txCommands).exec()
} catch (e) {
throw new Error(`[sessionManager]: Error when deleting expired sessions from Redis: ${e.message}`)
}
}
}

async function _getSessionFromRedis (token) {
Expand Down
40 changes: 39 additions & 1 deletion creator-node/test/dbManager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const models = require('../src/models')
const DBManager = require('../src/dbManager')
const BlacklistManager = require('../src/blacklistManager')
const utils = require('../src/utils')
const { createStarterCNodeUser, getCNodeUser, destroyUsers } = require('./lib/dataSeeds')
const { createStarterCNodeUser, getCNodeUser, destroyUsers, createSession } = require('./lib/dataSeeds')
const { getApp } = require('./lib/app')
const { getIPFSMock } = require('./lib/ipfsMock')
const { getLibsMock } = require('./lib/libsMock')
Expand Down Expand Up @@ -485,4 +485,42 @@ describe('Test ClockRecord model', async function () {
})
})

describe('Test deleteSessionTokensFromDB when provided an Array of SessionTokens that all exist in the SessionToken table', async function () {
const initialClockVal = 0
let cnodeUserUUID, server, token1, token2

/** Init server to run DB migrations */
before(async function () {
const appInfo = await getApp(getIPFSMock(), getLibsMock(), BlacklistManager)
server = appInfo.server
})

/** Reset DB state + Create cnodeUser + confirm initial clock state + define global vars */
beforeEach(async function () {
// Wipe all CNodeUsers + dependent data
await destroyUsers()
const resp = await createStarterCNodeUser()
cnodeUserUUID = resp.cnodeUserUUID
// Confirm initial clock val in DB
const cnodeUser = await getCNodeUser(cnodeUserUUID)
assert.strictEqual(cnodeUser.clock, initialClockVal)
// Seed DB
token1 = await createSession()
token2 = await createSession()
await DBManager.deleteSessionTokensFromDB([token1, token2])
})

/** Wipe all CNodeUsers + dependent data */
after(async function () {
await destroyUsers()
await server.close()
})

it('Successfully deletes the session tokens from the DB', async function () {
const deletedToken1 = await models.SessionToken.findByPk(token1.id)
const deletedToken2 = await models.SessionToken.findByPk(token2.id)
assert(deletedToken1 === null)
assert(deletedToken2 === null)
})
})
describe.skip('TODO - Test deleteAllCNodeUserData', async function () { })

0 comments on commit c538f27

Please sign in to comment.