Skip to content

Commit

Permalink
Merge pull request #85 from flowforge/2156-ha-replicas
Browse files Browse the repository at this point in the history
HA: multiple instance replicas
  • Loading branch information
knolleary committed Jun 2, 2023
2 parents a685a91 + daa05a4 commit c650b6a
Showing 1 changed file with 74 additions and 28 deletions.
102 changes: 74 additions & 28 deletions kubernetes.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ const createDeployment = async (project, options) => {
localPod.spec.containers[0].resources.limits.cpu = `${stack.cpu * 10}m`
}

const ha = await project.getSetting('ha')
if (ha?.replicas > 1) {
localDeployment.spec.replicas = ha.replicas
}

project.url = projectURL
await project.save()

Expand Down Expand Up @@ -460,6 +465,21 @@ const createPod = async (project, options) => {
return localPod
}

const getEndpoints = async (project) => {
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
if (await project.getSetting('ha')) {
const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
const hosts = []
for (const address in addresses) {
hosts.push(addresses[address])
}
return hosts
} else {
return [`${prefix}${project.safeName}.${this._namespace}`]
}
}

module.exports = {
/**
* Initialises this driver
Expand Down Expand Up @@ -731,6 +751,7 @@ module.exports = {
(details.body.status?.conditions[0].type === 'Available' ||
(details.body.status?.conditions[0].type === 'Progressing' && details.body.status?.conditions[0].reason === 'NewReplicaSetAvailable')
)) {
// not calling all endpoints for HA as they should be the same
const infoURL = `http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/info`
try {
const info = JSON.parse((await got.get(infoURL)).body)
Expand Down Expand Up @@ -763,6 +784,7 @@ module.exports = {
meta: {}
}
} else if (details.body.status?.phase === 'Running') {
// not calling all endpoints for HA as they should be the same
const infoURL = `http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/info`
try {
const info = JSON.parse((await got.get(infoURL)).body)
Expand Down Expand Up @@ -819,12 +841,16 @@ module.exports = {
if (this._projects[project.id] === undefined) {
return { state: 'unknown' }
}
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
json: {
cmd: 'start'
}
})
const endpoints = await getEndpoints(project)
const commands = []
for (const address in endpoints) {
commands.push(got.post(`http://${endpoints[address]}:2880/flowforge/command`, {
json: {
cmd: 'start'
}
}))
}
await Promise.all(commands)
return { status: 'okay' }
},

Expand All @@ -837,12 +863,16 @@ module.exports = {
if (this._projects[project.id] === undefined) {
return { state: 'unknown' }
}
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
json: {
cmd: 'stop'
}
})
const endpoints = await getEndpoints(project)
const commands = []
for (const address in endpoints) {
commands.push(got.post(`http://${endpoints[address]}:2880/flowforge/command`, {
json: {
cmd: 'stop'
}
}))
}
await Promise.all(commands)
return Promise.resolve({ status: 'okay' })
},

Expand All @@ -855,9 +885,21 @@ module.exports = {
if (this._projects[project.id] === undefined) {
return { state: 'unknown' }
}
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
const result = await got.get(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/logs`).json()
return result
if (await project.getSetting('ha')) {
const addresses = await getEndpoints(project)
const logRequests = []
for (const address in addresses) {
logRequests.push(got.get(`http://${addresses[address]}:2880/flowforge/logs`).json())
}
const results = await Promise.all(logRequests)
const combinedResults = results.flat(1)
combinedResults.sort((a, b) => { return a.ts - b.ts })
return combinedResults
} else {
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
const result = await got.get(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/logs`).json()
return result
}
},

/**
Expand All @@ -869,12 +911,16 @@ module.exports = {
if (this._projects[project.id] === undefined) {
return { state: 'unknown' }
}
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
json: {
cmd: 'restart'
}
})
const endpoints = await getEndpoints(project)
const commands = []
for (const address in endpoints) {
commands.push(got.post(`http://${endpoints[address]}:2880/flowforge/command`, {
json: {
cmd: 'restart'
}
}))
}
await Promise.all(commands)
return { state: 'okay' }
},
/**
Expand All @@ -884,18 +930,18 @@ module.exports = {
* @return {forge.Status}
*/
revokeUserToken: async (project, token) => { // logout:nodered(step-3)
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
try {
this._app.log.debug(`[k8s] Project ${project.id} - logging out node-red instance`)
await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, { // logout:nodered(step-4)
this._app.log.debug(`[k8s] Project ${project.id} - logging out node-red instance`)
const endpoints = await getEndpoints(project)
const commands = []
for (const address in endpoints) {
commands.push(got.post(`http://${endpoints[address]}:2880/flowforge/command`, {
json: {
cmd: 'logout',
token
}
})
} catch (error) {
this._app.log.error(`[k8s] Project ${project.id} - error in 'revokeUserToken': ${error.stack}`)
}))
}
await Promise.all(commands)
},
/**
* Shutdown Driver
Expand Down

0 comments on commit c650b6a

Please sign in to comment.