Skip to content

Commit

Permalink
SnapbackSM: Disable cron based scheduling in favor of delay (#932)
Browse files Browse the repository at this point in the history
* Disable cron based scheduling in favor of delay
* Fix cnode middleware condition around 'getOwnEndoint'
* Add spOwnerWallet local config, comments to getOwnEndpoint middleware, minor test corresponding fixes
  • Loading branch information
dmanjunath committed Oct 14, 2020
1 parent 5496c36 commit 620106e
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 31 deletions.
1 change: 1 addition & 0 deletions creator-node/compose/docker-compose.yml
Expand Up @@ -27,6 +27,7 @@ services:
- delegatePrivateKey=${delegatePrivateKey}
- creatorNodeEndpoint=${creatorNodeEndpoint}
- snapbackDevModeEnabled=${snapbackDevModeEnabled}
- spOwnerWallet=${spOwnerWallet}
env_file:
- ./env/base.env
ports:
Expand Down
Empty file.
1 change: 1 addition & 0 deletions creator-node/scripts/run-tests.sh
Expand Up @@ -95,6 +95,7 @@ mkdir -p $storagePath
export delegateOwnerWallet="0x1eC723075E67a1a2B6969dC5CfF0C6793cb36D25"
export delegatePrivateKey="0xdb527e4d4a2412a443c17e1666764d3bba43e89e61129a35f9abc337ec170a5d"
export creatorNodeEndpoint="http://localhost:5000"
export spOwnerWallet="0x1eC723075E67a1a2B6969dC5CfF0C6793cb36D25"

# tests
run_unit_tests
Expand Down
19 changes: 14 additions & 5 deletions creator-node/src/middlewares.js
Expand Up @@ -132,13 +132,22 @@ async function getOwnEndpoint (req) {
const spId = await libs.ethContracts.ServiceProviderFactoryClient.getServiceProviderIdFromEndpoint(creatorNodeEndpoint)
if (!spId) throw new Error('Cannot get spId for node')
const spInfo = await libs.ethContracts.ServiceProviderFactoryClient.getServiceEndpointInfo('creator-node', spId)
// confirm on-chain endpoint exists and is valid FQDN

// Confirm on-chain endpoint exists and is valid FQDN
// Error condition is met if any of the following are true
// - No spInfo returned from chain
// - Configured spOwnerWallet does not match on chain spOwnerWallet
// - Configured delegateOwnerWallet does not match on chain delegateOwnerWallet
// - Endpoint returned from chain but is an invalid FQDN, preventing successful operations
// - Endpoint returned from chain does not match configured endpoint
if (!spInfo ||
!spInfo.hasOwnProperty('endpoint') ||
!(spInfo.owner !== config.get('spOwnerWallet')) ||
!(spInfo.delegateOwnerWallet !== config.get('delegateOwnerWallet')) ||
(spInfo['endpoint'] && !_isFQDN(spInfo['endpoint']))) {
throw new Error(`Cannot getOwnEndpoint for node ${spInfo}`)
(spInfo.owner.toLowerCase() !== config.get('spOwnerWallet').toLowerCase()) ||
(spInfo.delegateOwnerWallet.toLowerCase() !== config.get('delegateOwnerWallet').toLowerCase()) ||
(spInfo['endpoint'] && !_isFQDN(spInfo['endpoint'])) ||
(spInfo['endpoint'] !== creatorNodeEndpoint)
) {
throw new Error(`Cannot getOwnEndpoint for node. Returned from chain=${JSON.stringify(spInfo)}, configs=(creatorNodeEndpoint=${config.get('creatorNodeEndpoint')}, spOwnerWallet=${config.get('spOwnerWallet')}, delegateOwnerWallet=${config.get('delegateOwnerWallet')})`)
}
return spInfo['endpoint']
}
Expand Down
28 changes: 11 additions & 17 deletions creator-node/src/snapbackSM.js
Expand Up @@ -20,8 +20,8 @@ const SyncMonitoringRetryDelay = 15000
// Base value used to filter users over a 24 hour period
const ModuloBase = 24

// 0 */1 * * * every hours at minute 0
const StateMachineSchedule = '0 */1 * * *'
// Delay 1 hour between production state machine jobs
const ProductionJobDelayInMs = 3600000

/*
SnapbackSM aka Snapback StateMachine
Expand Down Expand Up @@ -53,10 +53,10 @@ class SnapbackSM {
logger.info(`SnapbackSM: ${msg}`)
}

// Initialize queue object with provided name
// Initialize queue object with provided name and unix timestamp
createBullQueue (queueName) {
return new Bull(
queueName,
`${queueName}-${Date.now()}`,
{
redis: {
port: config.get('redisPort'),
Expand Down Expand Up @@ -414,19 +414,16 @@ class SnapbackSM {
if (config.get('snapbackDevModeEnabled')) {
this.log(`DEV MODE next job in ${DevDelayInMS}ms at ${new Date(Date.now() + DevDelayInMS)}`)
await utils.timeout(DevDelayInMS)
this.stateMachineQueue.add({ startTime: Date.now() })
} else {
this.log(`Next job in ${ProductionJobDelayInMs}ms at ${new Date(Date.now() + ProductionJobDelayInMs)}`)
await utils.timeout(ProductionJobDelayInMs)
}
await this.stateMachineQueue.add({ startTime: Date.now() })
done()
}
}
)

// Run the task every x time interval
if (!config.get('snapbackDevModeEnabled')) {
this.log(`Enabling cron with following schedule: ${StateMachineSchedule}`)
this.stateMachineQueue.add({}, { repeat: { cron: StateMachineSchedule } })
}

// Initialize sync queue processor function, as drained will issue syncs
// A maximum of 10 sync jobs are allowed to be issued at once
this.syncQueue.process(
Expand All @@ -443,13 +440,10 @@ class SnapbackSM {
}
}
)

// Enqueue first state machine operation if dev mode enabled
if (config.get('snapbackDevModeEnabled')) {
this.stateMachineQueue.add({ startTime: Date.now() })
}

await this.initializeNodeIdentityConfig()

// Enqueue first state machine operation
await this.stateMachineQueue.add({ startTime: Date.now() })
}
}

Expand Down
10 changes: 9 additions & 1 deletion creator-node/test/lib/libsMock.js
Expand Up @@ -15,8 +15,16 @@ function getLibsMock () {
discoveryProviderEndpoint: 'http://docker.for.mac.localhost:5000'
}
}

libsMock.ethContracts.ServiceProviderFactoryClient.getServiceProviderIdFromEndpoint.returns('1')
libsMock.ethContracts.ServiceProviderFactoryClient.getServiceEndpointInfo.returns({ 'endpoint': 'http://localhost:5000' })
libsMock.ethContracts.ServiceProviderFactoryClient.getServiceEndpointInfo.returns({
endpoint: 'http://localhost:5000',
owner: '0x1eC723075E67a1a2B6969dC5CfF0C6793cb36D25',
spID: '1',
type: 'creator-node',
blockNumber: 1234,
delegateOwnerWallet: '0x1eC723075E67a1a2B6969dC5CfF0C6793cb36D25'
})
libsMock.User.getUsers.returns([{ 'creator_node_endpoint': 'http://localhost:5000', 'blocknumber': 10, 'track_blocknumber': 10 }])
libsMock.User.getUsers.atMost(10)

Expand Down
29 changes: 21 additions & 8 deletions libs/initScripts/local.js
Expand Up @@ -230,46 +230,59 @@ const _initAllVersions = async (audiusLibs) => {
}

// Write an update to either the common .sh file for creator nodes or docker env file
const _updateCreatorNodeConfigFile = async (readPath, writePath, delegateOwnerWallet, delegateWalletPkey, endpoint, isShell) => {
const _updateCreatorNodeConfigFile = async (readPath, writePath, ownerWallet, ownerWalletPkey, endpoint, isShell) => {
const fileStream = fs.createReadStream(readPath)
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
})
let output = []
let walletFound = false
let delegateOwnerWalletFound = false
let spOwnerWalletFound = false
let pkeyFound = false
let endpointFound = false
const ownerWalletLine = `${isShell ? 'export ' : ''}delegateOwnerWallet=${delegateOwnerWallet}`

// Local dev, delegate and owner wallet are equal
let delegateOwnerWallet = ownerWallet
let delegateWalletPkey = ownerWalletPkey

const spOwnerWalletLine = `${isShell ? 'export ' : ''}spOwnerWallet=${ownerWallet}`
const delegateOwnerWalletLine = `${isShell ? 'export ' : ''}delegateOwnerWallet=${delegateOwnerWallet}`
const pkeyLine = `${isShell ? 'export ' : ''}delegatePrivateKey=0x${delegateWalletPkey}`
const endpointLine = `${isShell ? 'export ' : ''}creatorNodeEndpoint=${endpoint}`

for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
if (line.includes('delegateOwnerWallet')) {
output.push(ownerWalletLine)
walletFound = true
output.push(delegateOwnerWalletLine)
delegateOwnerWalletFound = true
} else if (line.includes('delegatePrivateKey')) {
output.push(pkeyLine)
pkeyFound = true
} else if (line.includes('creatorNodeEndpoint')) {
output.push(endpointLine)
endpointFound = true
} else if (line.includes('spOwnerWallet')) {
output.push(spOwnerWalletLine)
spOwnerWalletFound = true
} else {
output.push(line)
}
}

if (!walletFound) {
output.push(ownerWalletLine)
if (!delegateOwnerWalletFound) {
output.push(delegateOwnerWalletLine)
}
if (!pkeyFound) {
output.push(pkeyLine)
}
if (!endpointFound) {
output.push(endpointLine)
}
if (!spOwnerWalletFound) {
output.push(spOwnerWalletLine)
}

fs.writeFileSync(writePath, output.join('\n'))
console.log(`Updated ${writePath} with ${delegateOwnerWallet}:${delegateWalletPkey}, endpoint=${endpoint}`)
console.log(`Updated ${writePath} with spOwnerWallet=${ownerWallet}\ndelegateOwnerWallet=${delegateOwnerWallet}\ndelegateWalletPkey=${delegateWalletPkey}\nendpoint=${endpoint}`)
}

0 comments on commit 620106e

Please sign in to comment.