Skip to content
This repository has been archived by the owner on Jan 19, 2021. It is now read-only.

Commit

Permalink
add lock
Browse files Browse the repository at this point in the history
  • Loading branch information
jochem-brouwer committed Sep 6, 2020
1 parent 49beeca commit 9b5bbf8
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/baseTrie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ export class Trie {
const childRef = child[1] as Buffer
const childKey = key.concat(keyExtension)
const priority = childKey.length
taskExecutor.execute(priority, async (taskCallback: Function) => {
await taskExecutor.execute(priority, async (taskCallback: Function) => {
const childNode = await self._lookupNode(childRef)
taskCallback()
if (childNode) {
Expand All @@ -275,7 +275,7 @@ export class Trie {
const childKey = key.slice()
childKey.push(childIndex)
const priority = childKey.length
taskExecutor.execute(priority, async (taskCallback: Function) => {
await taskExecutor.execute(priority, async (taskCallback: Function) => {
const childNode = await self._lookupNode(childRef)
taskCallback()
if (childNode) {
Expand Down
42 changes: 31 additions & 11 deletions src/prioritizedTaskExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Semaphore from 'semaphore-async-await'

interface Task {
priority: number
fn: Function
Expand All @@ -10,6 +12,8 @@ export class PrioritizedTaskExecutor {
private currentPoolSize: number
/** The task queue */
private queue: Task[]
/** The Lock */
private lock: Semaphore

/**
* Executes tasks up to maxPoolSize at a time, other items are put in a priority queue.
Expand All @@ -21,6 +25,7 @@ export class PrioritizedTaskExecutor {
this.maxPoolSize = maxPoolSize
this.currentPoolSize = 0
this.queue = []
this.lock = new Semaphore(1)
}

/**
Expand All @@ -29,17 +34,22 @@ export class PrioritizedTaskExecutor {
* @param priority The priority of the task
* @param fn The function that accepts the callback, which must be called upon the task completion.
*/
execute(priority: number, fn: Function) {
if (this.currentPoolSize < this.maxPoolSize) {
this.currentPoolSize++
fn(() => {
this.currentPoolSize--
if (this.queue.length > 0) {
const item = this.queue.shift()
this.execute(item!.priority, item!.fn)
async execute(priority: number, fn: Function) {
let self = this
function runTask() {
self.currentPoolSize++
fn(async () => {
self.currentPoolSize--
if (self.queue.length > 0) {
const item = self.queue.shift()
await self.execute(item!.priority, item!.fn)
}
})
}
if (this.currentPoolSize < this.maxPoolSize) {
runTask()
} else {
await this.lock.wait()
if (this.queue.length == 0) {
this.queue.push({ priority, fn })
} else {
Expand All @@ -50,15 +60,24 @@ export class PrioritizedTaskExecutor {
return Math.floor(left + (right - left) / 2)
}
while (true) {
// note that there is a special case: it could be that during sorting, a Task is finished (reducing currentPoolSize by 1), but this Task was not yet inserted
// therefore, if we want to insert the item we explicitly check that we indeed should Queue it, if not, we execute it and do not insert it.
let index = mid()
let value = this.queue[index].priority
console.log(left, right, index, value)
if (value == priority) {
this.queue.splice(index, 0, { priority, fn })
if (this.currentPoolSize < this.maxPoolSize) {
runTask()
} else {
this.queue.splice(index, 0, { priority, fn })
}
break
}
if (left == right) {
this.queue.splice(left, 0, { priority, fn })
if (this.currentPoolSize < this.maxPoolSize) {
runTask()
} else {
this.queue.splice(left, 0, { priority, fn })
}
break
}

Expand All @@ -69,6 +88,7 @@ export class PrioritizedTaskExecutor {
}
}
}
this.lock.signal()
}
}

Expand Down
22 changes: 11 additions & 11 deletions test/prioritizedTaskExecutor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,33 @@ import * as tape from 'tape'
import { PrioritizedTaskExecutor } from '../src/prioritizedTaskExecutor'

tape('prioritized task executor test', function (t: any) {
t.test('should execute tasks in the right order', (st: any) => {
t.test('should execute tasks in the right order', async (st: any) => {
const taskExecutor = new PrioritizedTaskExecutor(2)
const tasks = [1, 2, 3, 4]
const callbacks = [] as any
const executionOrder = [] as any
tasks.forEach(function (task) {
taskExecutor.execute(task, function (cb: Function) {
for (let task of tasks) {
await taskExecutor.execute(task, function (cb: Function) {
executionOrder.push(task)
callbacks.push(cb)
})
})
}

callbacks.forEach(function (callback: Function) {
callback()
})
for (let callback of callbacks) {
await callback()
}

const expectedExecutionOrder = [1, 2, 4, 3]
st.deepEqual(executionOrder, expectedExecutionOrder)
st.end()
})

t.test('should queue tasks in the right order', (st: any) => {
t.test('should queue tasks in the right order', async (st: any) => {
const priorityList = [0, 1, 0, 2, 0, 1, 0, 2, 2, 1]
const PTE = new PrioritizedTaskExecutor(0) // this ensures that no task actually gets executed, so this essentially just checks the sort algorithm
priorityList.map((priority) => {
PTE.execute(priority, () => {})
})
for (let priority of priorityList) {
await PTE.execute(priority, () => {})
}
// have to cast the PTE as <any> to access the private queue
st.deepEqual(
(<any>PTE).queue.map((task: any) => task.priority),
Expand Down

0 comments on commit 9b5bbf8

Please sign in to comment.