Skip to content

Commit

Permalink
fix(distributed): canceling a task that is not handled locally waits …
Browse files Browse the repository at this point in the history
…for the task to be canceled (#180)

* fix(distributed): task cancelation waits for instance to answer

* fix: allow training to start

* chore: allow setting max linting through CLI

* fix: canceling a task works when task is not handled on local instance

* chore(e2e): added a test that ensures canceling an unexistant training fails with 404

* chore(gh): add gh check to make sure trainings are distributed

* chore: refactor e2e tests + fix race condition in distributed queue (#181)
  • Loading branch information
franklevasseur committed Feb 22, 2022
1 parent 07f6aad commit 98afa80
Show file tree
Hide file tree
Showing 25 changed files with 416 additions and 128 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/bench.yml
Expand Up @@ -23,5 +23,5 @@ jobs:
- name: Run Regression Test
run: |
yarn start lang --dim 100 &
sleep 15s && yarn start nlu --doc false --verbose 0 --ducklingEnabled false --languageURL http://localhost:3100 &
sleep 15s && yarn start nlu --doc false --log-level "critical" --ducklingEnabled false --languageURL http://localhost:3100 &
sleep 25s && yarn bench --skip="clinc150"
168 changes: 150 additions & 18 deletions .github/workflows/e2e.yml
@@ -1,15 +1,67 @@
name: E2E
on: [pull_request]
jobs:
run_e2e:
name: Run e2e tests using binary executable file
fs:
name: file system
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@master
- uses: actions/setup-node@v1
with:
node-version: '16.13.0'
- name: Fetch Node Packages
run: |
yarn --verbose
- name: Build
run: |
yarn build
- name: package
run: |
yarn package --linux
- name: Rename binary
id: rename_binary
run: |
bin_original_name=$(node -e "console.log(require('./scripts/utils/binary').getFileName())")
echo "Moving ./dist/$bin_original_name to ./nlu ..."
mv ./dist/$bin_original_name ./nlu
- name: Download language models
run: |
./nlu lang download --lang en --dim 25
- name: Start Language Server
run: |
./nlu lang --dim 25 &
echo "Lang Server started on pid $!"
- name: Sleep
uses: jakejarvis/wait-action@master
with:
time: '15s'
- name: Start NLU Server
run: |
./nlu \
--log-level "critical" \
--ducklingEnabled false \
--languageURL http://localhost:3100 \
--port 3200 &
nlu_pid=$!
echo "NLU Server started on pid $nlu_pid"
- name: Sleep
uses: jakejarvis/wait-action@master
with:
time: '15s'
- name: Run Tests
run: |
yarn e2e --nlu-endpoint http://localhost:3200
db:
name: database
runs-on: ubuntu-latest
services:
postgres:
# Docker Hub image
image: postgres
env:
POSTGRES_DB: botpress-nlu
POSTGRES_DB: botpress-nlu-1
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_PORT: 5432
Expand Down Expand Up @@ -53,31 +105,111 @@ jobs:
uses: jakejarvis/wait-action@master
with:
time: '15s'
- name: Run Tests on File System
- name: Start NLU Server
run: |
./nlu \
--verbose 0 \
--log-level "critical" \
--ducklingEnabled false \
--languageURL http://localhost:3100 \
--port 3200 &
--port 3201 \
--dbURL postgres://postgres:postgres@localhost:5432/botpress-nlu-1 & \
nlu_pid=$!
echo "NLU Server started on pid $nlu_pid"
- name: Sleep
uses: jakejarvis/wait-action@master
with:
time: '15s'
- name: Run Tests
run: |
yarn e2e --nlu-endpoint http://localhost:3201
cluster:
name: cluster
runs-on: ubuntu-latest
services:
postgres:
# Docker Hub image
image: postgres
env:
POSTGRES_DB: botpress-nlu-2
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_PORT: 5432
ports:
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Checkout code
uses: actions/checkout@master
- uses: actions/setup-node@v1
with:
node-version: '16.13.0'
- name: Fetch Node Packages
run: |
yarn --verbose
- name: Build
run: |
yarn build
- name: package
run: |
yarn package --linux
- name: Rename binary
id: rename_binary
run: |
bin_original_name=$(node -e "console.log(require('./scripts/utils/binary').getFileName())")
echo "Moving ./dist/$bin_original_name to ./nlu ..."
mv ./dist/$bin_original_name ./nlu
- name: Download language models
run: |
./nlu lang download --lang en --dim 25
- name: Start Language Server
run: |
./nlu lang --dim 25 &
echo "Lang Server started on pid $!"
- name: Sleep
uses: jakejarvis/wait-action@master
with:
time: '15s'

sleep 10s && \
yarn e2e --nlu-endpoint http://localhost:3200 && \
kill -9 $nlu_pid
- name: Start First NLU Server on port 3202
run: |
./nlu \
--log-level "critical" \
--ducklingEnabled false \
--maxTraining 0 \
--maxLinting 0 \
--languageURL http://localhost:3100 \
--port 3202 \
--dbURL postgres://postgres:postgres@localhost:5432/botpress-nlu-2 & \
nlu_pid1=$!
echo "NLU Server started on pid $nlu_pid1"
- name: Run Tests on Database
- name: Sleep
uses: jakejarvis/wait-action@master
with:
time: '5s'

- name: Start Second NLU Server on port 3203
run: |
./nlu \
--verbose 0 \
--log-level "critical" \
--ducklingEnabled false \
--languageURL http://localhost:3100 \
--port 3200 \
--dbURL postgres://postgres:postgres@localhost:5432/botpress-nlu & \
nlu_pid=$!
echo "NLU Server started on pid $nlu_pid"
--port 3203 \
--dbURL postgres://postgres:postgres@localhost:5432/botpress-nlu-2 & \
nlu_pid2=$!
echo "NLU Server started on pid $nlu_pid2"
sleep 10s && \
yarn e2e --nlu-endpoint http://localhost:3200 && \
kill -9 $nlu_pid
- name: Sleep
uses: jakejarvis/wait-action@master
with:
time: '15s'

- name: Run Tests
run: |
yarn e2e --nlu-endpoint http://localhost:3202
89 changes: 37 additions & 52 deletions packages/distributed/src/queues/base-queue.ts
Expand Up @@ -4,7 +4,7 @@ import _ from 'lodash'
import moment from 'moment'
import { nanoid } from 'nanoid'

import { TaskAlreadyStartedError, TaskNotFoundError } from './errors'
import { TaskAlreadyStartedError } from './errors'
import { createTimer, InterruptTimer } from './interrupt'
import {
Task,
Expand All @@ -18,19 +18,20 @@ import {
TaskQueue as ITaskQueue
} from './typings'

export class BaseTaskQueue<TId, TInput, TData, TError> implements ITaskQueue<TId, TInput, TData, TError> {
export abstract class BaseTaskQueue<TId, TInput, TData, TError> implements ITaskQueue<TId, TInput, TData, TError> {
private _schedulingTimmer!: InterruptTimer<[]>
protected _clusterId: string = nanoid()

constructor(
protected _taskRepo: SafeTaskRepository<TId, TInput, TData, TError>,
private _taskRunner: TaskRunner<TId, TInput, TData, TError>,
private _logger: Logger,
private _idToString: (id: TId) => string,
private _options: QueueOptions<TId, TInput, TData, TError>
protected _taskRunner: TaskRunner<TId, TInput, TData, TError>,
protected _logger: Logger,
protected _idToString: (id: TId) => string,
protected _options: QueueOptions<TId, TInput, TData, TError>
) {}

public async initialize() {
this._logger.debug(`cluster id: "${this._clusterId}"`)
await this._taskRepo.initialize()
this._schedulingTimmer = createTimer(this._runSchedulerInterrupt.bind(this), this._options.maxProgressDelay * 2)
}
Expand Down Expand Up @@ -68,64 +69,40 @@ export class BaseTaskQueue<TId, TInput, TData, TError> implements ITaskQueue<TId
void this.runSchedulerInterrupt()
}

public async cancelTask(taskId: TId): Promise<void> {
const taskKey = this._idToString(taskId)
return this._taskRepo.inTransaction(async (repo) => {
const currentTask = await repo.get(taskId)
if (!currentTask) {
throw new TaskNotFoundError(taskKey)
}

const zombieTasks = await this._getZombies(repo)
const isZombie = !!zombieTasks.find((t) => this._idToString(t) === taskKey)

if (currentTask.status === 'pending' || isZombie) {
const newTask = { ...currentTask, status: <TaskStatus>'canceled' }
return repo.set(newTask)
}

if (currentTask.cluster !== this._clusterId) {
this._logger.debug(`Task "${taskId}" was not launched on this instance`)
return
}

if (currentTask.status === 'running') {
return this._taskRunner.cancel(currentTask)
}
}, 'cancelTask')
}
public abstract cancelTask(taskId: TId): Promise<void>

protected async runSchedulerInterrupt() {
return this._schedulingTimmer.run()
try {
return this._schedulingTimmer.run()
} catch (thrown) {
const err = thrown instanceof Error ? thrown : new Error(`${thrown}`)
this._logger.attachError(err).error('An error occured when running scheduler interrupt.')
}
}

private _runSchedulerInterrupt = async () => {
return this._taskRepo.inTransaction(async (repo) => {
await this._queueBackZombies(repo)

const localTasks = await repo.query({ cluster: this._clusterId, status: 'running' })
if (localTasks.length >= this._options.maxTasks) {
this._logger.debug(
`[${this._clusterId}/${this._options.queueId}] max allowed of task already launched in queue.`
)
return
}

const zombieTasks = await this._getZombies(repo)
if (zombieTasks.length) {
this._logger.debug(`Queuing back ${zombieTasks.length} tasks because they seem to be zombies.`)

const progress = this._options.initialProgress
const newState = { status: <TaskStatus>'zombie', cluster: this._clusterId, progress }
await Bluebird.each(zombieTasks, (z) => repo.set({ ...z, ...newState }))
}

const pendings = await repo.query({ status: 'pending' })
if (pendings.length <= 0) {
return
}

const task = pendings[0]
task.status = 'running'

task.cluster = this._clusterId
await repo.set(task)

// floating promise to return fast from scheduler interrupt
// floating promise to return fast from scheduler interrupt and to prevent deadlock
void this._runTask(task)
}, '_runSchedulerInterrupt')
}
Expand All @@ -135,9 +112,7 @@ export class BaseTaskQueue<TId, TInput, TData, TError> implements ITaskQueue<TId
this._logger.debug(`task "${taskKey}" is about to start.`)

const updateTask = _.throttle(async () => {
await this._taskRepo.inTransaction(async (repo) => {
return repo.set(task)
}, 'progressCallback')
await this._taskRepo.inTransaction((repo) => repo.set(task), 'progressCallback')
}, this._options.progressThrottle)

try {
Expand All @@ -153,9 +128,7 @@ export class BaseTaskQueue<TId, TInput, TData, TError> implements ITaskQueue<TId
updateTask.flush()

if (terminatedTask) {
await this._taskRepo.inTransaction(async (repo) => {
return repo.set(terminatedTask)
}, '_task_terminated')
await this._taskRepo.inTransaction((repo) => repo.set(terminatedTask), '_task_terminated')
}
} catch (thrown) {
updateTask.flush()
Expand All @@ -168,8 +141,20 @@ export class BaseTaskQueue<TId, TInput, TData, TError> implements ITaskQueue<TId
}
}

private _getZombies = (repo: TaskRepository<TId, TInput, TData, TError>) => {
protected _queueBackZombies = async (repo: TaskRepository<TId, TInput, TData, TError>) => {
const zombieThreshold = moment().subtract(this._options.maxProgressDelay, 'ms').toDate()
return repo.queryOlderThan({ status: 'running' }, zombieThreshold)
const newZombies = await repo.queryOlderThan({ status: 'running' }, zombieThreshold)
if (newZombies.length) {
this._logger.debug(`Queuing back ${newZombies.length} tasks because they seem to be zombies.`)

const progress = this._options.initialProgress
const newState = { status: <TaskStatus>'zombie', cluster: this._clusterId, progress }
await Bluebird.each(newZombies, (z) => repo.set({ ...z, ...newState }))
}
}

protected _isCancelable = (task: Task<TId, TInput, TData, TError>) => {
const cancellableStatus: TaskStatus[] = ['running', 'pending', 'zombie']
return cancellableStatus.includes(task.status)
}
}
6 changes: 6 additions & 0 deletions packages/distributed/src/queues/errors.ts
Expand Up @@ -4,6 +4,12 @@ export class TaskNotFoundError extends Error {
}
}

export class TaskNotRunning extends Error {
constructor(taskId: string) {
super(`no current running or pending task for model: ${taskId}`)
}
}

export class TaskAlreadyStartedError extends Error {
constructor(taskId: string) {
super(`Training "${taskId}" already started...`)
Expand Down
2 changes: 1 addition & 1 deletion packages/distributed/src/queues/index.ts
Expand Up @@ -2,4 +2,4 @@ export * from './typings'

export { PGDistributedTaskQueue } from './pg-distributed-queue'
export { LocalTaskQueue } from './local-queue'
export { TaskAlreadyStartedError, TaskNotFoundError } from './errors'
export { TaskAlreadyStartedError, TaskNotFoundError, TaskNotRunning } from './errors'

0 comments on commit 98afa80

Please sign in to comment.