Skip to content

Commit

Permalink
Merge pull request #1156 from botpress/ya-data-retention
Browse files Browse the repository at this point in the history
feat(storage): implement expiration date for user attributes
  • Loading branch information
allardy committed Nov 23, 2018
2 parents 73235a1 + 3026fe6 commit 7d2b9a0
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 10 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -28,6 +28,7 @@
"chalk": "^2.4.1",
"chokidar": "^2.0.4",
"cors": "^2.8.4",
"deep-diff": "^1.0.2",
"doctrine": "^2.1.0",
"errorhandler": "^1.5.0",
"eventemitter2": "^5.0.1",
Expand Down
10 changes: 10 additions & 0 deletions src/bp/core/app.inversify.ts
Expand Up @@ -13,6 +13,8 @@ import { applyDisposeOnExit } from './misc/inversify'
import { ModuleLoader } from './module-loader'
import { RepositoriesContainerModules } from './repositories/repositories.inversify'
import HTTPServer from './server'
import { DataRetentionJanitor } from './services/retention/janitor'
import { DataRetentionService } from './services/retention/service'
import { ServicesContainerModules } from './services/services.inversify'
import { Statistics } from './stats'
import { TYPES } from './types'
Expand Down Expand Up @@ -94,6 +96,14 @@ container
.bind<Statistics>(TYPES.Statistics)
.to(Statistics)
.inSingletonScope()
container
.bind<DataRetentionJanitor>(TYPES.DataRetentionJanitor)
.to(DataRetentionJanitor)
.inSingletonScope()
container
.bind<DataRetentionService>(TYPES.DataRetentionService)
.to(DataRetentionService)
.inSingletonScope()

const isPackaged = !!eval('process.pkg')
const isProduction = process.IS_PRODUCTION
Expand Down
11 changes: 10 additions & 1 deletion src/bp/core/botpress.ts
Expand Up @@ -29,6 +29,8 @@ import { EventEngine } from './services/middleware/event-engine'
import { StateManager } from './services/middleware/state-manager'
import { NotificationsService } from './services/notification/service'
import RealtimeService from './services/realtime'
import { DataRetentionJanitor } from './services/retention/janitor'
import { DataRetentionService } from './services/retention/service'
import { Statistics } from './stats'
import { TYPES } from './types'

Expand Down Expand Up @@ -68,7 +70,9 @@ export class Botpress {
@inject(TYPES.LoggerPersister) private loggerPersister: LoggerPersister,
@inject(TYPES.NotificationsService) private notificationService: NotificationsService,
@inject(TYPES.AppLifecycle) private lifecycle: AppLifecycle,
@inject(TYPES.StateManager) private stateManager: StateManager
@inject(TYPES.StateManager) private stateManager: StateManager,
@inject(TYPES.DataRetentionJanitor) private dataRetentionJanitor: DataRetentionJanitor,
@inject(TYPES.DataRetentionService) private dataRetentionService: DataRetentionService
) {
this.version = '12.0.1'
this.botpressPath = path.join(process.cwd(), 'dist')
Expand Down Expand Up @@ -159,6 +163,7 @@ export class Botpress {
await this.decisionEngine.processEvent(sessionId, event)
}

this.dataRetentionService.initialize()
this.stateManager.initialize()

const flowLogger = await this.loggerProvider('DialogEngine')
Expand All @@ -178,6 +183,10 @@ export class Botpress {
await this.logJanitor.start()
await this.dialogJanitor.start()

if (this.config!.dataRetention) {
await this.dataRetentionJanitor.start()
}

await this.lifecycle.setDone(AppLifecycleEvents.SERVICES_READY)
}

Expand Down
21 changes: 21 additions & 0 deletions src/bp/core/config/botpress.config.ts
Expand Up @@ -63,4 +63,25 @@ export type BotpressConfig = {
*/
licenseKey: string
sendUsageStats: boolean
/**
* When this feature is enabled, fields saved as user attributes will be automatically erased when they expires. The timer is reset each time the value is modified
* Setting a policy called "email": "30d" means that once an email is set, it will be removed in 30 days, unless it is changed in that timespan
*/
dataRetention?: DataRetentionConfig
}

export interface DataRetentionConfig {
/**
* The janitor will check for expired fields at the set interval (second, minute, hour, day)
* @example 1m
*/
janitorInterval: string
policies: RetentionPolicy
}

/**
* @example "profile.email": "30d"
*/
export type RetentionPolicy = {
[key: string]: string
}
4 changes: 3 additions & 1 deletion src/bp/core/database/tables/index.ts
Expand Up @@ -17,6 +17,7 @@ import {
AuthUsersTable,
BotsTable,
ChannelUsersTable,
DataRetentionTable,
MigrationsTable,
ServerMetadataTable
} from './server-wide'
Expand All @@ -36,7 +37,8 @@ const tables: (typeof Table)[] = [
GhostFilesTable,
GhostRevisionsTable,
NotificationsTable,
KeyValueStoreTable
KeyValueStoreTable,
DataRetentionTable
]

export default <(new (knex: Knex) => Table)[]>tables
19 changes: 19 additions & 0 deletions src/bp/core/database/tables/server-wide/data_retention.ts
@@ -0,0 +1,19 @@
import { Table } from 'core/database/interfaces'

export class DataRetentionTable extends Table {
name: string = 'data_retention'

async bootstrap() {
let created = false

await this.knex.createTableIfNotExists(this.name, table => {
table.text('channel').notNullable()
table.text('user_id').notNullable()
table.text('field_path').notNullable()
table.timestamp('expiry_date').notNullable()
table.timestamp('created_on').notNullable()
created = true
})
return created
}
}
1 change: 1 addition & 0 deletions src/bp/core/database/tables/server-wide/index.ts
Expand Up @@ -6,3 +6,4 @@ export * from './bots'
export * from './channel_users'
export * from './metadata'
export * from './migrations'
export * from './data_retention'
31 changes: 24 additions & 7 deletions src/bp/core/repositories/users.ts
@@ -1,10 +1,10 @@
import { Paging, User } from 'botpress/sdk'
import { DataRetentionService } from 'core/services/retention/service'
import { inject, injectable } from 'inversify'
import Knex from 'knex'

import Database from '../database'
import { TYPES } from '../types'

export interface UserRepository {
getOrCreate(channel: string, id: string): Knex.GetOrCreateResult<User>
updateAttributes(channel: string, id: string, attributes: any): Promise<void>
Expand All @@ -16,7 +16,10 @@ export interface UserRepository {
export class KnexUserRepository implements UserRepository {
private readonly tableName = 'srv_channel_users'

constructor(@inject(TYPES.Database) private database: Database) {}
constructor(
@inject(TYPES.Database) private database: Database,
@inject(TYPES.DataRetentionService) private dataRetentionService: DataRetentionService
) {}

async getOrCreate(channel: string, id: string): Knex.GetOrCreateResult<User> {
channel = channel.toLowerCase()
Expand Down Expand Up @@ -62,15 +65,29 @@ export class KnexUserRepository implements UserRepository {
return { result: newUser, created: true }
}

async updateAttributes(channel: string, id: string, attributes: any): Promise<void> {
async getAttributes(channel: string, user_id: string): Promise<any> {
const user = await this.database
.knex(this.tableName)
.where({ channel, user_id })
.limit(1)
.select('attributes')
.first()

return this.database.knex.json.get(user.attributes)
}

async updateAttributes(channel: string, user_id: string, attributes: any): Promise<void> {
channel = channel.toLowerCase()

if (await this.dataRetentionService.hasPolicy()) {
const originalAttributes = await this.getAttributes(channel, user_id)
await this.dataRetentionService.updateExpirationForChangedFields(channel, user_id, originalAttributes, attributes)
}

await this.database
.knex(this.tableName)
.update({
attributes: this.database.knex.json.set(attributes)
})
.where({ channel, user_id: id })
.update({ attributes: this.database.knex.json.set(attributes) })
.where({ channel, user_id })
}

async getAllUsers(paging?: Paging) {
Expand Down
55 changes: 55 additions & 0 deletions src/bp/core/services/retention/janitor.ts
@@ -0,0 +1,55 @@
import { Logger } from 'botpress/sdk'
import { UserRepository } from 'core/repositories'
import { inject, injectable, tagged } from 'inversify'
import _ from 'lodash'
import { Memoize } from 'lodash-decorators'

import { BotpressConfig } from '../../config/botpress.config'
import { ConfigProvider } from '../../config/config-loader'
import { TYPES } from '../../types'
import { Janitor } from '../janitor'

import { DataRetentionService } from './service'

@injectable()
export class DataRetentionJanitor extends Janitor {
private BATCH_SIZE = 250

constructor(
@inject(TYPES.Logger)
@tagged('name', 'RetentionJanitor')
protected logger: Logger,
@inject(TYPES.ConfigProvider) private configProvider: ConfigProvider,
@inject(TYPES.DataRetentionService) private dataRetentionService: DataRetentionService,
@inject(TYPES.UserRepository) private userRepo: UserRepository
) {
super(logger)
}

@Memoize()
private async getBotpressConfig(): Promise<BotpressConfig> {
return this.configProvider.getBotpressConfig()
}

protected async getInterval(): Promise<string> {
const config = await this.getBotpressConfig()
return (config.dataRetention && config.dataRetention.janitorInterval) || '15m'
}

protected async runTask(): Promise<void> {
let expired = await this.dataRetentionService.getExpired(this.BATCH_SIZE)

while (expired.length > 0) {
await Promise.mapSeries(expired, async ({ channel, user_id, field_path }) => {
const { result: user } = await this.userRepo.getOrCreate(channel, user_id)

await this.userRepo.updateAttributes(channel, user.id, _.omit(user.attributes, field_path))
await this.dataRetentionService.delete(channel, user_id, field_path)
})

if (expired.length >= this.BATCH_SIZE) {
expired = await this.dataRetentionService.getExpired(this.BATCH_SIZE)
}
}
}
}
110 changes: 110 additions & 0 deletions src/bp/core/services/retention/service.ts
@@ -0,0 +1,110 @@
import { RetentionPolicy } from 'core/config/botpress.config'
import { ConfigProvider } from 'core/config/config-loader'
import Database from 'core/database'
import { TYPES } from 'core/types'
import diff from 'deep-diff'
import { inject, injectable } from 'inversify'
import _ from 'lodash'
import moment from 'moment'
import ms from 'ms'

import { getPaths } from './util'

export interface ExpiredData {
channel: string
user_id: string
field_path: string
}

@injectable()
export class DataRetentionService {
private readonly tableName = 'data_retention'
private policies: RetentionPolicy | undefined
private DELETED_ATTR = 'D'

constructor(
@inject(TYPES.ConfigProvider) private configProvider: ConfigProvider,
@inject(TYPES.Database) private database: Database
) {}

async initialize(): Promise<void> {
const config = await this.configProvider.getBotpressConfig()
this.policies = config.dataRetention && config.dataRetention.policies
}

hasPolicy(): boolean {
return !_.isEmpty(this.policies)
}

async updateExpirationForChangedFields(
channel: string,
user_id: string,
beforeAttributes: any,
afterAttributes: any
) {
const differences = diff(getPaths(beforeAttributes), getPaths(afterAttributes))
if (!differences || !this.policies) {
return
}

const changedPaths = _.flatten(differences.filter(diff => diff.kind != this.DELETED_ATTR).map(diff => diff.path))
if (!changedPaths.length) {
return
}

for (const field in this.policies) {
if (changedPaths.indexOf(field) > -1) {
const expiry = moment()
.add(ms(this.policies[field]), 'ms')
.toDate()

if (await this.get(channel, user_id, field)) {
await this.update(channel, user_id, field, expiry)
} else {
await this.insert(channel, user_id, field, expiry)
}
}
}
}

private async get(channel: string, user_id: string, field_path: string) {
return await this.database
.knex(this.tableName)
.where({ channel, user_id, field_path })
.limit(1)
.select('expiry_date')
.first()
}

private async insert(channel: string, user_id: string, field_path: string, expiry_date: Date) {
await this.database.knex(this.tableName).insert({
channel,
user_id,
field_path,
expiry_date: this.database.knex.date.format(expiry_date),
created_on: this.database.knex.date.now()
})
}

private async update(channel: string, user_id: string, field_path: string, expiry_date: Date) {
await this.database
.knex(this.tableName)
.update({ expiry_date: this.database.knex.date.format(expiry_date) })
.where({ channel, user_id, field_path })
}

async delete(channel: string, user_id: string, field_path: string): Promise<void> {
await this.database
.knex(this.tableName)
.where({ channel, user_id, field_path })
.del()
}

async getExpired(batchSize): Promise<ExpiredData[]> {
return await this.database
.knex(this.tableName)
.andWhere(this.database.knex.date.isBefore('expiry_date', new Date()))
.select('channel', 'user_id', 'field_path')
.limit(batchSize)
}
}

0 comments on commit 7d2b9a0

Please sign in to comment.