Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HA: multiple instance replicas #85

Merged
merged 15 commits into from
Jun 2, 2023
191 changes: 163 additions & 28 deletions kubernetes.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ const createDeployment = async (project, options) => {
localPod.spec.containers[0].resources.limits.cpu = `${stack.cpu * 10}m`
}

const ha = await project.getSetting('ha')
console.log(ha)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
console.log(ha)
console.log(ha)

if (ha?.replicas > 1) {
localDeployment.spec.replicas = ha.replicas
}

project.url = projectURL
await project.save()

Expand Down Expand Up @@ -456,6 +462,21 @@ const createPod = async (project, options) => {
return localPod
}

const getEndpoints = async (project) => {
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
if (await project.getSetting('ha')) {
const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
const hosts = []
for (const address in addresses) {
hosts.push(addresses[address])
}
return hosts
} else {
return [`${prefix}${project.safeName}.${this._namespace}`]
}
}

module.exports = {
/**
* Initialises this driver
Expand Down Expand Up @@ -727,6 +748,7 @@ module.exports = {
(details.body.status?.conditions[0].type === 'Available' ||
(details.body.status?.conditions[0].type === 'Progressing' && details.body.status?.conditions[0].reason === 'NewReplicaSetAvailable')
)) {
// not calling all endpoints for HA as they should be the same
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little risky - in case one instance has hit a hang or other unpleasantness that goes unnoticed.

For an initial iteration, we can go with this, but will need to look at next week.

const infoURL = `http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/info`
try {
const info = JSON.parse((await got.get(infoURL)).body)
Expand Down Expand Up @@ -759,6 +781,7 @@ module.exports = {
meta: {}
}
} else if (details.body.status?.phase === 'Running') {
// not calling all endpoints for HA as they should be the same
const infoURL = `http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/info`
try {
const info = JSON.parse((await got.get(infoURL)).body)
Expand Down Expand Up @@ -815,12 +838,36 @@ module.exports = {
if (this._projects[project.id] === undefined) {
return { state: 'unknown' }
}
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
json: {
cmd: 'start'
}
})
// const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
// if (await project.getSetting('ha')) {
// const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
// const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
// const commands = []
// for (const address in addresses) {
// commands.push(got.post(`http://${addresses[address]}:2880/flowforge/command`, {
// json: {
// cmd: 'start'
// }
// }))
// }
// await Promise.all(commands)
// } else {
// await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
// json: {
// cmd: 'start'
// }
// })
// }
knolleary marked this conversation as resolved.
Show resolved Hide resolved
const endpoints = await getEndpoints(project)
const commands = []
for (const address in endpoints) {
commands.push(got.post(`http://${endpoints[address]}:2880/flowforge/command`, {
json: {
cmd: 'start'
}
}))
}
await Promise.all(commands)
return { status: 'okay' }
},

Expand All @@ -833,12 +880,36 @@ module.exports = {
if (this._projects[project.id] === undefined) {
return { state: 'unknown' }
}
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
json: {
cmd: 'stop'
}
})
// const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
// if (await project.getSetting('ha')) {
// const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
// const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
// const commands = []
// for (const address in addresses) {
// commands.push(got.post(`http://${addresses[address]}:2880/flowforge/command`, {
// json: {
// cmd: 'stop'
// }
// }))
// }
// await Promise.all(commands)
// } else {
// await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
// json: {
// cmd: 'stop'
// }
// })
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
// if (await project.getSetting('ha')) {
// const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
// const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
// const commands = []
// for (const address in addresses) {
// commands.push(got.post(`http://${addresses[address]}:2880/flowforge/command`, {
// json: {
// cmd: 'stop'
// }
// }))
// }
// await Promise.all(commands)
// } else {
// await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
// json: {
// cmd: 'stop'
// }
// })
// }

const endpoints = await getEndpoints(project)
const commands = []
for (const address in endpoints) {
commands.push(got.post(`http://${endpoints[address]}:2880/flowforge/command`, {
json: {
cmd: 'stop'
}
}))
}
await Promise.all(commands)
return Promise.resolve({ status: 'okay' })
},

Expand All @@ -851,9 +922,21 @@ module.exports = {
if (this._projects[project.id] === undefined) {
return { state: 'unknown' }
}
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
const result = await got.get(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/logs`).json()
return result
if (await project.getSetting('ha')) {
const addresses = await getEndpoints(project)
const logRequests = []
for (const address in addresses) {
logRequests.push(got.get(`http://${addresses[address]}:2880/flowforge/logs`).json())
}
const results = await Promise.all(logRequests)
const combinedResults = results.flat(1)
combinedResults.sort((a, b) => { return a.ts - b.ts })
return combinedResults
} else {
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
const result = await got.get(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/logs`).json()
return result
}
},

/**
Expand All @@ -865,12 +948,36 @@ module.exports = {
if (this._projects[project.id] === undefined) {
return { state: 'unknown' }
}
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
json: {
cmd: 'restart'
}
})
// const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
// if (project.getSetting('ha')) {
// const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
// const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
// const commands = []
// for (const address in addresses) {
// commands.push(got.post(`http://${addresses[address]}:2880/flowforge/command`, {
// json: {
// cmd: 'restart'
// }
// }))
// }
// await Promise.all(commands)
// } else {
// await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
// json: {
// cmd: 'restart'
// }
// })
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
// if (project.getSetting('ha')) {
// const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
// const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
// const commands = []
// for (const address in addresses) {
// commands.push(got.post(`http://${addresses[address]}:2880/flowforge/command`, {
// json: {
// cmd: 'restart'
// }
// }))
// }
// await Promise.all(commands)
// } else {
// await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, {
// json: {
// cmd: 'restart'
// }
// })
// }

const endpoints = await getEndpoints(project)
const commands = []
for (const address in endpoints) {
commands.push(got.post(`http://${endpoints[address]}:2880/flowforge/command`, {
json: {
cmd: 'restart'
}
}))
}
await Promise.all(commands)
return { state: 'okay' }
},
/**
Expand All @@ -880,18 +987,46 @@ module.exports = {
* @return {forge.Status}
*/
revokeUserToken: async (project, token) => { // logout:nodered(step-3)
const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
try {
this._app.log.debug(`[k8s] Project ${project.id} - logging out node-red instance`)
await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, { // logout:nodered(step-4)
this._app.log.debug(`[k8s] Project ${project.id} - logging out node-red instance`)
// const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
// if (project.getSetting('ha')) {
// const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
// const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
// const commands = []
// for (const address in addresses) {
// commands.push(got.post(`http://${addresses[address]}:2880/flowforge/command`, { // logout:nodered(step-4)
// json: {
// cmd: 'logout',
// token
// }
// }))
// }
// Promise.all(commands).catch(error => {
// this._app.log.error(`[k8s] Project ${project.id} - error in 'revokeUserToken': ${error.stack}`)
// })
// } else {
// try {
// await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, { // logout:nodered(step-4)
// json: {
// cmd: 'logout',
// token
// }
// })
// } catch (error) {
// this._app.log.error(`[k8s] Project ${project.id} - error in 'revokeUserToken': ${error.stack}`)
// }
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// const prefix = project.safeName.match(/^[0-9]/) ? 'srv-' : ''
// if (project.getSetting('ha')) {
// const endpoints = await this._k8sApi.readNamespacedEndpoints(`${prefix}${project.safeName}`, this._namespace)
// const addresses = endpoints.body.subsets[0].addresses.map(a => { return a.ip })
// const commands = []
// for (const address in addresses) {
// commands.push(got.post(`http://${addresses[address]}:2880/flowforge/command`, { // logout:nodered(step-4)
// json: {
// cmd: 'logout',
// token
// }
// }))
// }
// Promise.all(commands).catch(error => {
// this._app.log.error(`[k8s] Project ${project.id} - error in 'revokeUserToken': ${error.stack}`)
// })
// } else {
// try {
// await got.post(`http://${prefix}${project.safeName}.${this._namespace}:2880/flowforge/command`, { // logout:nodered(step-4)
// json: {
// cmd: 'logout',
// token
// }
// })
// } catch (error) {
// this._app.log.error(`[k8s] Project ${project.id} - error in 'revokeUserToken': ${error.stack}`)
// }
// }

const endpoints = await getEndpoints(project)
const commands = []
for (const address in endpoints) {
commands.push(got.post(`http://${endpoints[address]}:2880/flowforge/command`, {
json: {
cmd: 'logout',
token
}
})
} catch (error) {
this._app.log.error(`[k8s] Project ${project.id} - error in 'revokeUserToken': ${error.stack}`)
}))
}
await Promise.all(commands)
},
/**
* Shutdown Driver
Expand Down