Skip to content

Commit

Permalink
Merge pull request #2240 from botpress/ya-ghost-changes
Browse files Browse the repository at this point in the history
fix(ghost): pushing local to remote directly
  • Loading branch information
slvnperron committed Aug 14, 2019
2 parents 1b34499 + c4e8039 commit 8bfb80a
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 71 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -32,6 +32,7 @@
"date-fns": "^1.30.1",
"debug": "^4.1.1",
"deep-diff": "^1.0.2",
"diff": "^4.0.1",
"doctrine": "^2.1.0",
"dotenv": "^7.0.0",
"errorhandler": "^1.5.0",
Expand Down
8 changes: 4 additions & 4 deletions src/bp/core/botpress.ts
Expand Up @@ -273,16 +273,16 @@ export class Botpress {

@WrapErrorsWith('Error initializing Ghost Service')
async initializeGhost(): Promise<void> {
this.ghostService.initialize(process.CLUSTER_ENABLED)
const useDbDriver = process.BPFS_STORAGE === 'database'
this.ghostService.initialize(useDbDriver)
const global = await this.ghostService.global().directoryListing('/')

if (process.CLUSTER_ENABLED && _.isEmpty(global)) {
if (useDbDriver && _.isEmpty(global)) {
this.logger.info('Syncing data/global/ to database')
await this.ghostService.global().sync()
const botsRef = await this.workspaceService.getBotRefs()

this.logger.info('Syncing data/bots/ to database')
await Promise.map(botsRef, botId => this.ghostService.forBot(botId).sync())
await this.ghostService.bots().sync()
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/bp/core/misc/archive.ts
@@ -1,7 +1,10 @@
import fse from 'fs-extra'
import glob from 'glob'
import mkdirp from 'mkdirp'
import path from 'path'
import stream from 'stream'
import tar from 'tar'
import tmp from 'tmp'
import { VError } from 'verror'

import { forceForwardSlashes } from './utils'
Expand Down Expand Up @@ -45,3 +48,24 @@ export const createArchive = async (fileName: string, folder: string, files: str
throw new VError(err, `[Archive] Error creating archive "${fileName}"`)
}
}

export const createArchiveFromFolder = async (folder: string, ignoredFiles: string[]): Promise<Buffer> => {
const tmpDir = tmp.dirSync({ unsafeCleanup: true })

try {
const files: string[] = await Promise.fromCallback(cb =>
glob('**/*', { cwd: folder, ignore: ignoredFiles, nodir: true, dot: true }, cb)
)

for (const file of files) {
await mkdirp.sync(path.dirname(path.join(tmpDir.name, file)))
fse.copyFileSync(path.resolve(folder, file), path.resolve(tmpDir.name, file))
}

const filename = path.join(tmpDir.name, 'archive.tgz')
const archive = await createArchive(filename, tmpDir.name, files)
return await fse.readFile(archive)
} finally {
tmpDir.removeCallback()
}
}
60 changes: 46 additions & 14 deletions src/bp/core/routers/admin/versioning.ts
@@ -1,9 +1,13 @@
import { Logger } from 'botpress/sdk'
import { extractArchive } from 'core/misc/archive'
import { GhostService } from 'core/services'
import { BotService } from 'core/services/bot-service'
import { CMSService } from 'core/services/cms'
import { Router } from 'express'
import _ from 'lodash'
import mkdirp from 'mkdirp'
import path from 'path'
import tmp from 'tmp'

import { CustomRouter } from '../customRouter'

Expand Down Expand Up @@ -42,32 +46,60 @@ export class VersioningRouter extends CustomRouter {
})
)

// Return the list of local and production file changes
this.router.get(
this.router.post(
'/changes',
this.asyncMiddleware(async (req, res) => {
const changes = await this.ghost.listFileChanges()
res.json(changes)
const tmpDir = tmp.dirSync({ unsafeCleanup: true })

try {
await this.extractArchiveFromRequest(req, tmpDir.name)

res.send({ changes: await this.ghost.listFileChanges(tmpDir.name) })
} catch (error) {
res.status(500).send('Error while pushing changes')
} finally {
tmpDir.removeCallback()
}
})
)

// Force update of the production files by the local files
// Force update of the remote files by the local files
this.router.post(
'/update',
this.asyncMiddleware(async (req, res) => {
const botsIds = await this.botService.getBotsIds()
await Promise.map(botsIds, async id => {
await this.ghost.forBot(id).forceUpdate()
const tmpDir = tmp.dirSync({ unsafeCleanup: true })
const beforeBotIds = await this.botService.getBotsIds()

// When force sync, we also need to manually invalidate the CMS
await this.cmsService.clearElementsFromCache(id)
await this.cmsService.loadElementsForBot(id)
})
try {
await this.extractArchiveFromRequest(req, tmpDir.name)
await this.ghost.forceUpdate(tmpDir.name)

await this.ghost.global().forceUpdate()
// we let the caches invalidate and propagate TODO: do this properly instead of waiting
await Promise.delay(5000)

res.status(200).send({ success: true })
// Unmount all previous bots and re-mount only the remaining (and new) bots
await Promise.map(beforeBotIds, id => this.botService.unmountBot(id))
const afterBotIds = await this.botService.getBotsIds()
await Promise.map(afterBotIds, id => this.botService.mountBot(id))

res.sendStatus(200)
} catch (error) {
res.status(500).send('Error while pushing changes')
} finally {
tmpDir.removeCallback()
}
})
)
}

extractArchiveFromRequest = async (request, folder) => {
const dataFolder = path.join(folder, 'data')
await mkdirp.sync(dataFolder)

const buffer: Buffer[] = []
request.on('data', chunk => buffer.push(chunk))

await Promise.fromCallback(cb => request.on('end', cb))
await extractArchive(Buffer.concat(buffer), dataFolder)
}
}
10 changes: 10 additions & 0 deletions src/bp/core/services/cms.ts
Expand Up @@ -31,6 +31,7 @@ export class CMSService implements IDisposeOnExit {
broadcastAddElement: Function = this.local__addElementToCache
broadcastUpdateElement: Function = this.local__updateElementFromCache
broadcastRemoveElements: Function = this.local__removeElementsFromCache
broadcastInvalidateForBot: Function = this.local__invalidateForBot

private readonly contentTable = 'content_elements'
private readonly typesDir = 'content-types'
Expand Down Expand Up @@ -62,6 +63,7 @@ export class CMSService implements IDisposeOnExit {
this.broadcastUpdateElement = await this.jobService.broadcast<ContentElement>(
this.local__updateElementFromCache.bind(this)
)
this.broadcastInvalidateForBot = await this.jobService.broadcast<string>(this.local__invalidateForBot.bind(this))

await this.prepareDb()
await this._loadContentTypesFromFiles()
Expand Down Expand Up @@ -620,4 +622,12 @@ export class CMSService implements IDisposeOnExit {
contentType: contentTypeId
})
}

/**
* Important! Do not use directly. Needs to be broadcasted.
*/
private async local__invalidateForBot(botId: string): Promise<void> {
await this.clearElementsFromCache(botId)
await this.loadElementsForBot(botId)
}
}
117 changes: 102 additions & 15 deletions src/bp/core/services/ghost/service.ts
Expand Up @@ -2,6 +2,7 @@ import { ListenHandle, Logger } from 'botpress/sdk'
import { ObjectCache } from 'common/object-cache'
import { isValidBotId } from 'common/validation'
import { asBytes, forceForwardSlashes } from 'core/misc/utils'
import { diffLines } from 'diff'
import { EventEmitter2 } from 'eventemitter2'
import fse from 'fs-extra'
import { inject, injectable, tagged } from 'inversify'
Expand All @@ -20,7 +21,16 @@ import { FileRevision, PendingRevisions, ReplaceContent, ServerWidePendingRevisi
import DBStorageDriver from './db-driver'
import DiskStorageDriver from './disk-driver'

export type FileChanges = { scope: string; changes: string[] }[]
// TODO: better typings
export type FileChanges = {
scope: string
changes: {
path: string
action: string
add?: number
del?: number
}[]
}[]

const MAX_GHOST_FILE_SIZE = asBytes('100mb')

Expand Down Expand Up @@ -55,25 +65,102 @@ export class GhostService {
)
}

async listFileChanges(): Promise<FileChanges> {
const botsIds = (await this.bots().directoryListing('/', 'bot.config.json')).map(path.dirname)
custom(baseDir: string) {
return new ScopedGhostService(baseDir, this.diskDriver, this.dbDriver, false, this.cache, this.logger)
}

// TODO: refactor this
async forceUpdate(tmpFolder: string) {
const invalidateFile = async (fileName: string) => {
await this.cache.invalidate(`object::${fileName}`)
await this.cache.invalidate(`buffer::${fileName}`)
}

for (const { scope, changes } of await this.listFileChanges(tmpFolder)) {
const dbRevs = await this.dbDriver.listRevisions(scope === 'global' ? 'data/global' : 'data/bots/' + scope)
await Promise.each(dbRevs, rev => this.dbDriver.deleteRevision(rev.path, rev.revision))

await Promise.map(changes.filter(x => x.action === 'del'), async file => {
await this.dbDriver.deleteFile(file.path)
await invalidateFile(file.path)
})

await Promise.map(changes.filter(x => ['add', 'edit'].includes(x.action)), async file => {
const content = await this.diskDriver.readFile(path.join(tmpFolder, file.path))
await this.dbDriver.upsertFile(file.path, content, false)
await invalidateFile(file.path)
})
}
}

// TODO: refactor this
async listFileChanges(tmpFolder: string): Promise<FileChanges> {
const tmpDiskGlobal = this.custom(path.resolve(tmpFolder, 'data/global'))
const tmpDiskBot = (botId?: string) => this.custom(path.resolve(tmpFolder, 'data/bots', botId || ''))

// We need local and remote bot ids to correctly display changes
const localBotIds = (await this.bots().directoryListing('/', 'bot.config.json')).map(path.dirname)
const remoteBotIds = (await tmpDiskBot().directoryListing('/', 'bot.config.json')).map(path.dirname)
const botsIds = _.uniq([...remoteBotIds, ...localBotIds])

const uniqueFile = file => `${file.path} | ${file.revision}`

const botsFileChanges = await Promise.map(botsIds, async botId => {
const localRevs = await this.forBot(botId).listDiskRevisions()
const prodRevs = await this.forBot(botId).listDbRevisions()
const syncedRevs = _.intersectionBy(localRevs, prodRevs, uniqueFile)
const unsyncedFiles = _.uniq(_.differenceBy(prodRevs, syncedRevs, uniqueFile).map(x => x.path))
const getFileDiff = async file => {
try {
const localFile = (await this.diskDriver.readFile(path.join(tmpFolder, file))).toString()
const dbFile = (await this.dbDriver.readFile(file)).toString()

const diff = diffLines(dbFile, localFile)

return {
path: file,
action: 'edit',
add: _.sumBy(diff.filter(d => d.added), 'count'),
del: _.sumBy(diff.filter(d => d.removed), 'count')
}
} catch (err) {
// Todo better handling
this.logger.attachError(err).error(`Error while checking diff for "${file}"`)
return { path: file, action: 'edit' }
}
}

return { scope: botId, changes: unsyncedFiles }
})
// Adds the correct prefix to files so they are displayed correctly when reviewing changes
const getDirectoryFullPaths = async (scope: string, ghost) => {
const files = await ghost.directoryListing('/', '*.*', undefined, true)
const getPath = file =>
scope === 'global' ? path.join('data/global', file) : path.join('data/bots', scope, file)

return files.map(file => forceForwardSlashes(getPath(file)))
}

const localRevs = await this.global().listDiskRevisions()
const prodRevs = await this.global().listDbRevisions()
const syncedRevs = _.intersectionBy(localRevs, prodRevs, uniqueFile)
const unsyncedFiles = _.uniq(_.differenceBy(prodRevs, syncedRevs, uniqueFile).map(x => x.path))
const getFileChanges = async (scope: string, localGhost: ScopedGhostService, remoteGhost: ScopedGhostService) => {
const localRevs = await localGhost.listDiskRevisions()
const remoteRevs = await remoteGhost.listDbRevisions()
const syncedRevs = _.intersectionBy(localRevs, remoteRevs, uniqueFile)
const unsyncedFiles = _.uniq(_.differenceBy(remoteRevs, syncedRevs, uniqueFile).map(x => x.path))

const localFiles: string[] = await getDirectoryFullPaths(scope, localGhost)
const remoteFiles: string[] = await getDirectoryFullPaths(scope, remoteGhost)
const deleted = _.difference(remoteFiles, localFiles).map(x => ({ path: x, action: 'del' }))
const added = _.difference(localFiles, remoteFiles).map(x => ({ path: x, action: 'add' }))

const filterDeleted = file => !_.map([...deleted, ...added], 'path').includes(file)
const edited = (await Promise.map(unsyncedFiles.filter(filterDeleted), getFileDiff)).filter(
x => x.add !== 0 || x.del !== 0
)

return {
scope,
changes: [...added, ...deleted, ...edited]
}
}

const botsFileChanges = await Promise.map(botsIds, botId =>
getFileChanges(botId, tmpDiskBot(botId), this.forBot(botId))
)

return [...botsFileChanges, { scope: 'global', changes: unsyncedFiles }]
return [...botsFileChanges, await getFileChanges('global', tmpDiskGlobal, this.global())]
}

bots(): ScopedGhostService {
Expand Down
17 changes: 12 additions & 5 deletions src/bp/index.ts
Expand Up @@ -86,6 +86,7 @@ try {
},
argv => {
process.IS_PRODUCTION = argv.production || yn(process.env.BP_PRODUCTION) || yn(process.env.CLUSTER_ENABLED)
process.BPFS_STORAGE = process.core_env.BPFS_STORAGE || 'disk'

let defaultVerbosity = process.IS_PRODUCTION ? 0 : 2
if (!isNaN(Number(process.env.VERBOSITY_LEVEL))) {
Expand Down Expand Up @@ -124,7 +125,7 @@ try {
)
.command(
'pull',
'Sync pending changes from an external server running botpress to local files',
'Pull data from a remote server to your local file system',
{
url: {
description: 'Base URL of the botpress server from which you want to pull changes',
Expand All @@ -133,14 +134,14 @@ try {
},
authToken: {
alias: 'token',
description: 'your authorization token on the remote botpress server',
description: 'Authorization token on the remote botpress server',
// tslint:disable-next-line:no-null-keyword
default: null,
type: 'string'
},
targetDir: {
alias: 'dir',
description: 'target directory in which you want sync the changes. will be created if doesnt exist',
description: 'Target directory where the remote data will be stored',
default: path.join(__dirname, 'data'),
type: 'string'
}
Expand All @@ -152,16 +153,22 @@ try {
'Push local files to a remote botpress server',
{
url: {
description: 'url of the botpress server to which to push changes',
description: 'URL of the botpress server to which to push changes',
default: 'http://localhost:3000',
type: 'string'
},
authToken: {
alias: 'token',
description: 'your authorization token on the remote botpress server',
description: 'Authorization token on the remote botpress server',
// tslint:disable-next-line:no-null-keyword
default: null,
type: 'string'
},
targetDir: {
alias: 'dir',
description: 'The local directory containing the data you want to push on the remote server',
default: path.join(__dirname, 'data'),
type: 'string'
}
},
argv => require('./push').default(argv)
Expand Down

0 comments on commit 8bfb80a

Please sign in to comment.