Skip to content

Commit

Permalink
Use SP ID as the source of truth when issuing reconfigs (#1715)
Browse files Browse the repository at this point in the history
* wip

* make reconfig/pot sync ops a method

* update comment

* update reconfig mode tests

* set reconfig mode to highest for local dev

* update test logs

* make spId not valid trigger reconfig

* refactor reconfig mdog tests

* update test runner

* filter out self node if is in replica set

* fix pumba exec command to set packet loss to 0

* add more spId source of truth unit tests

* simplify filter logic

* add back compat for spIds undefined in DN resp

* fix exec command

* removed unnecessary mocked method

* use filtered secondaries arr

* only print if number exit code is returned

* removed unsued reconfig env var

* use helpers delay fn

* address comments

* update test-ci suite with reconfig tests

* Fix maddog sync failures

* bump num creator nodes from 4 to 10

Co-authored-by: SidSethi <SidSethi93@gmail.com>
  • Loading branch information
vicky-g and SidSethi committed Aug 10, 2021
1 parent 3b1b37f commit 9758e84
Show file tree
Hide file tree
Showing 13 changed files with 694 additions and 197 deletions.
4 changes: 2 additions & 2 deletions creator-node/compose/env/base.env
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ snapbackModuloBase=3
minimumDailySyncCount=5
minimumRollingSyncCount=10
minimumSuccessfulSyncCountPercentage=50
secondaryUserSyncDailyFailureCountThreshold=2
snapbackHighestReconfigMode=ONE_SECONDARY
snapbackHighestReconfigMode=PRIMARY_AND_OR_SECONDARIES
secondaryUserSyncDailyFailureCountThreshold=10
minimumSecondaryUserSyncSuccessPercent=50
maxSyncMonitoringDurationInMs=10000 # 10sec

Expand Down
6 changes: 4 additions & 2 deletions creator-node/src/snapbackSM/peerSetManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class PeerSetManager {
* @notice This function depends on a new discprov route and cannot be consumed until every discprov exposes that route
* It will throw if the route doesn't exist
* @returns {Object[]} array of objects
* - Each object has schema { primary, secondary1, secondary2, user_id, wallet }
* - Each object should have the schema { primary, secondary1, secondary2, user_id, wallet, primarySpID, secondary1SpID, secondary2SpID },
* and at the very least have the schema { primary, secondary1, secondary2, user_id, wallet }
*/
async getAllNodeUsers () {
// Fetch discovery node currently connected to libs as this can change
Expand Down Expand Up @@ -172,7 +173,8 @@ class PeerSetManager {
* Retrieve users with this node as primary
* Leaving this function in until all discovery providers update to new version and expose new `/users/content_node/all` route
* @returns {Object[]} array of objects
* - Each object has schema { primary, secondary1, secondary2, user_id, wallet }
* - Each object should have the schema { primary, secondary1, secondary2, user_id, wallet, primarySpID, secondary1SpID, secondary2SpID }
* and at the very least have the schema { primary, secondary1, secondary2, user_id, wallet }
*/
async getNodePrimaryUsers () {
// Fetch discovery node currently connected to libs as this can change
Expand Down
360 changes: 251 additions & 109 deletions creator-node/src/snapbackSM/snapbackSM.js

Large diffs are not rendered by default.

329 changes: 293 additions & 36 deletions creator-node/test/snapbackSM.test.js

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions discovery-provider/src/api/v1/models/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
"primary": fields.String(required=False),
"secondary1": fields.String(required=False),
"secondary2": fields.String(required=False),
"primarySpID": fields.Integer(required=False),
"secondary1SpID": fields.Integer(required=False),
"secondary2SpID": fields.Integer(required=False),
},
)

Expand Down
2 changes: 1 addition & 1 deletion discovery-provider/src/api/v1/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def get(self, replica_type):
"""New route to call get_users_cnode with replica_type param (only consumed by content node)
- Leaving `/users/creator_node` above untouched for backwards-compatibility
Response = array of objects of schema { user_id, wallet, primary, secondary1, secondary2 }
Response = array of objects of schema { user_id, wallet, primary, secondary1, secondary2, primarySpId, secondary1SpID, secondary2SpID }
"""
args = users_by_content_node_route_parser.parse_args()

Expand Down
9 changes: 7 additions & 2 deletions discovery-provider/src/queries/get_users_cnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,18 @@ def get_users_cnode(cnode_endpoint_string, replica_type=ReplicaType.PRIMARY):
"wallet",
("creator_node_endpoints") [1] as "primary",
("creator_node_endpoints") [2] as "secondary1",
("creator_node_endpoints") [3] as "secondary2"
("creator_node_endpoints") [3] as "secondary2",
"primary_id" as "primarySpID",
("secondary_ids") [1] as "secondary1SpID",
("secondary_ids") [2] as "secondary2SpID"
FROM
(
SELECT
"user_id",
"wallet",
string_to_array("creator_node_endpoint", ',') as "creator_node_endpoints"
string_to_array("creator_node_endpoint", ',') as "creator_node_endpoints",
"primary_id",
"secondary_ids"
FROM
"users"
WHERE
Expand Down
46 changes: 39 additions & 7 deletions mad-dog/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const {
IpldBlacklistTest,
userReplicaSetBlockSaturationTest,
trackListenCountsTest,
userReplicaSetNodes
SnapbackReconfigTests
} = require('./tests/')

// Configuration.
Expand Down Expand Up @@ -255,16 +255,26 @@ async function main () {
break
}
case 'test-ursm-nodes': {
const test = makeTest(
'userReplicaSetNodesTest',
userReplicaSetNodes,
const deregisterCNTest = makeTest(
'snapbackReconfigTestDeregisterCN',
SnapbackReconfigTests.deregisterCN,
{
numUsers: 8,
numCreatorNodes: 10,
iterations: 3
iterations: 2
}
)
await testRunner([test])

const forceCNUnavailabilityTest = makeTest(
'snapbackReconfigTestForceCNUnavailability',
SnapbackReconfigTests.forceCNUnavailability,
{
numUsers: 8,
numCreatorNodes: 10,
iterations: 2
}
)
await testRunner([deregisterCNTest, forceCNUnavailabilityTest])
break
}
case 'test-ci': {
Expand Down Expand Up @@ -310,13 +320,35 @@ async function main () {
}
)

const deregisterCNTest = makeTest(
'snapbackReconfigTestDeregisterCN',
SnapbackReconfigTests.deregisterCN,
{
numUsers: 2,
numCreatorNodes: 10,
iterations: 2
}
)

const forceCNUnavailabilityTest = makeTest(
'snapbackReconfigTestForceCNUnavailability',
SnapbackReconfigTests.forceCNUnavailability,
{
numUsers: 2,
numCreatorNodes: 10,
iterations: 2
}
)

const tests = [
coreIntegrationTests,
snapbackTest,
...blacklistTests,
ursmTest,
ursmBlockSaturationTest,
trackListenCountTest
trackListenCountTest,
deregisterCNTest,
forceCNUnavailabilityTest
]

await testRunner(tests)
Expand Down
24 changes: 15 additions & 9 deletions mad-dog/src/madDog.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const makeCreatorNodeName = num => `cn${num}_creator-node_1`
// For now, this only takes down a single node
// per test.
class MadDog {
constructor({
constructor ({
numCreatorNodes,
downProbability = DOWN_PROBABILITY,
pauseDurationSec = PAUSE_DURATION_SEC,
Expand All @@ -42,7 +42,7 @@ class MadDog {
* Starts mad dog. Creates stormy network conditions
* and randomly pauses down a single node over the course of the test.
*/
start(serviceName) {
start (serviceName) {
logger.info('Starting 😡🐶')
this._setNetworkConditions(serviceName)
this.tickToken = setInterval(() => {
Expand All @@ -56,35 +56,41 @@ class MadDog {
}, TICK_INTERVAL_SEC * 1000)
}

stop() {
stop () {
clearInterval(this.tickToken)
if (this.netemProcess) this.netemProcess.kill()
if (this.netemProcess) {
// Kill original process
this.netemProcess.kill()
// Set packet loss to 0% aka revert back to normal
exec(`pumba netem --tc-image gaiadocker/iproute2 -d ${this.networkDownDurationSec}s loss -p 0 containers ${this.netemProcessService}`)
}
if (this.pauseProcess) this.pauseProcess.kill()
}

_setNetworkConditions(serviceName) {
_setNetworkConditions (serviceName) {
const service = serviceName || _.sample(this.services)
logger.info(
`Setting ${this.packetLossPercent}% packet loss for service: ${service}`
)
this.netemProcessService = service
this.netemProcess = exec(`pumba netem --tc-image gaiadocker/iproute2 -d ${this.networkDownDurationSec}s loss -p ${this.packetLossPercent} containers ${service}`)
this._setProcessLogs(this.netemProcess)
}

_setProcessLogs(process) {
_setProcessLogs (process) {
process.stdout.on('data', (data) => logger.info(`stdout: ${data}`))
process.stderr.on('data', (data) => logger.error(`stderr: ${data}`))
process.on('exit', (code) => {
logger.info(`child process exited with code ${code}`)
if (!isNaN(code)) logger.info(`child process exited with code ${code}`)
process = null
})
}

_pauseService(serviceName) {
_pauseService (serviceName) {
const service = serviceName || _.sample(this.services)
didPause = true
logger.info(`Pausing service: [${service}]`)
this.pauseProcess = exec(`pumba pause -d ${this.pauseDurationSec}s constainers ${service}`)
this.pauseProcess = exec(`pumba pause -d ${this.pauseDurationSec}s containers ${service}`)
_setProcessLogs(this.pauseProcess)
setTimeout(() => {
logger.info(`Unpausing service: [${service}]`)
Expand Down
4 changes: 2 additions & 2 deletions mad-dog/src/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { userReplicaSetBlockSaturationTest } = require('./test_ursmBlockSaturatio
const { userReplicaSetManagerTest } = require('./test_userReplicaSetManager.js')
const { trackListenCountsTest } = require('./test_plays.js')

const { userReplicaSetNodes } = require('./test_userReplicaSetNodes')
const SnapbackReconfigTests = require('./test_userReplicaSetNodes')

module.exports = {
coreIntegration,
Expand All @@ -14,5 +14,5 @@ module.exports = {
userReplicaSetManagerTest,
userReplicaSetBlockSaturationTest,
trackListenCountsTest,
userReplicaSetNodes
SnapbackReconfigTests
}
4 changes: 2 additions & 2 deletions mad-dog/src/tests/test_userReplicaSetManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ const verifyUserReplicaSetStatus = async (
// Throw if mismatch between queried primaryID and assigned
// spID on chain for this endpoint
if (primaryID !== primaryInfo.spID) {
throw new Error(`Mismatch spID values. Expected endpoint for ${primaryID}, found ${primaryInfo.spID}`)
throw new Error(`Mismatch spID values. Indexed value=${primaryID} chain value=${primaryInfo.spID}`)
}

// Throw if mismatch between primaryID from discovery-node and primaryID in UserReplicaSetManager
if (primaryID !== parseInt(usrReplicaInfoFromContract.primaryId)) {
throw new Error(`Mismatch primaryID values. Expected ${primaryID}, found ${usrReplicaInfoFromContract.primaryId}`)
throw new Error(`Mismatch primaryID values. Indexed value=${primaryID} contract value=${usrReplicaInfoFromContract.primaryId}`)
}

logger.info(`userId: ${userId} Replica Set Info: ${primaryID}, ${usrQueryInfo.secondary_ids}`)
Expand Down
81 changes: 66 additions & 15 deletions mad-dog/src/tests/test_userReplicaSetNodes/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
const { addAndUpgradeUsers } = require('../../helpers.js')
const verifyValidCNs = require('./verifyValidCN')
// const deregisterRandomCreatorNode = require('./deregisterRandomCreatorNode.js')
const deregisterRandomCreatorNode = require('./deregisterRandomCreatorNode.js')
const stopRandomCreatorNode = require('./stopRandomCreatorNode.js')
const setNumCreatorNodes = require('./setNumCreatorNodes.js')
const { uploadTracksforUsers } = require('../../utils/uploadTracksForUsers')
const { logger } = require('../../logger')
const { delay } = require('../../helpers')

const MAX_ATTEMPTS_TO_VALIDATE_REPLICA_SET = 10
const MAX_ATTEMPTS_TO_VALIDATE_REPLICA_SET = 20
const WAIT_INTERVAL_TO_UPDATE_REPLICA_SET_MS = 20000

/**
* Tests that the user replica sets update when a node is deregistered or down
*/
const userReplicaSetNodes = async ({
const deregisterCN = async ({
numUsers = 10,
numCreatorNodes = 10,
iterations,
Expand All @@ -29,33 +31,79 @@ const userReplicaSetNodes = async ({
executeOne
)
} catch (e) {
return { error: `Issue with creating and upgrading users: ${e}` }
return { error: `[Snapback CN Deregistering] Issue with creating and upgrading users: ${e}` }
}

logger.info('[Snapback CN Deregistering] Start')
for (let iteration = 0; iteration < iterations; iteration++) {
creatorNodeIDToInfoMapping = await setNumCreatorNodes(numCreatorNodes, executeOne)

// Upload tracks to users
await uploadTracksforUsers({ executeAll, executeOne, walletIndexToUserIdMap })

// TODO: Implement spID as source of truth feature before uncommenting deregistration test code
const deregisteredCreatorNodeId = await deregisterRandomCreatorNode(creatorNodeIDToInfoMapping)

// const deregisteredCreatorNodeId = await deregisterRandomCreatorNode(creatorNodeIDToInfoMapping)
// Create a MadDog instance, responsible for taking down 1 node
let attempts = 0
let passed = false
let error
while (attempts++ < MAX_ATTEMPTS_TO_VALIDATE_REPLICA_SET) {
await delay(WAIT_INTERVAL_TO_UPDATE_REPLICA_SET_MS)
try {
await verifyValidCNs(executeOne, executeAll, deregisteredCreatorNodeId, walletIndexToUserIdMap, creatorNodeIDToInfoMapping)
passed = true
break
} catch (e) {
error = e
}
}

// await new Promise(resolve => setTimeout(resolve, 10 * 1000))
// await verifyValidCNs(executeOne, executeAll, deregisteredCreatorNodeId, walletIndexToUserIdMap, creatorNodeIDToInfoMapping)
if (!passed) {
return {
error: `[Snapback CN Deregistering] Error with verifying updated replica set: ${error.toString()}`
}
}
}

logger.info('[Snapback CN Deregistering] SUCCESS!')
}

const forceCNUnavailability = async ({
numUsers = 10,
numCreatorNodes = 10,
iterations,
executeAll,
executeOne
}) => {
let creatorNodeIDToInfoMapping = {}
let walletIndexToUserIdMap = {}

// Creates and initialize users if user does not exist. Else, uses existing users.
try {
walletIndexToUserIdMap = await addAndUpgradeUsers(
numUsers,
executeAll,
executeOne
)
} catch (e) {
return { error: `[Snapback CN Unavailability] Issue with creating and upgrading users: ${e}` }
}

logger.info('[Snapback CN Unavailability] Start')

for (let iteration = 0; iteration < iterations; iteration++) {
creatorNodeIDToInfoMapping = await setNumCreatorNodes(numCreatorNodes, executeOne)

// Create a MadDog instance, responsible for taking down 1 node
const {
madDog,
removedCreatorNodeId
} = await stopRandomCreatorNode(creatorNodeIDToInfoMapping)

let error = null
let attempts = 0
let passed = false
let error
while (attempts++ < MAX_ATTEMPTS_TO_VALIDATE_REPLICA_SET) {
await new Promise(resolve => setTimeout(resolve, WAIT_INTERVAL_TO_UPDATE_REPLICA_SET_MS))
await delay(WAIT_INTERVAL_TO_UPDATE_REPLICA_SET_MS)
try {
await verifyValidCNs(executeOne, executeAll, removedCreatorNodeId, walletIndexToUserIdMap, creatorNodeIDToInfoMapping)
passed = true
Expand All @@ -65,16 +113,19 @@ const userReplicaSetNodes = async ({
}
}

madDog.stop()

if (!passed) {
return {
error: `Error with verifying updated replica set: ${error.toString()}`
error: `[Snapback CN Unavailability] Error with verifying updated replica set: ${error.toString()}`
}
}

madDog.stop()
}

logger.info('[Snapback CN Unavailability] SUCCESS!')
}

module.exports = {
userReplicaSetNodes
deregisterCN,
forceCNUnavailability
}

0 comments on commit 9758e84

Please sign in to comment.