Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Harness Test

  • Loading branch information...
commit de0667113003a30786b894b17bddf4ba2b187b74 1 parent f834d4e
@baoshan authored
View
3  .gitignore
@@ -2,5 +2,4 @@
*.dmp
*.rdb
node_modules
-lib/server.js
-lib/fairy.js
+lib/*.js
View
153 lib/server/fairy.js
@@ -1,153 +0,0 @@
-// Generated by CoffeeScript 1.3.1
-(function() {
- var arr, bind, detail_bind, init, select_index, statistics, timer_id;
-
- arr = ['<table class="table table-bordered overview">', '<thead><tr><th>Queue</th><th>Workers</th><th>Avg. Time</th><th>Total</th><th>Finished</th><th>Processing</th><th>Pending</th><th>Failed</th><th>Blocked</th><th>Schedule</th><th>Clear</th></tr></thead>', '<tbody>', '<% _.each(data, function(item){ %>', '<tr key=mykey>', '<td><%= item.name %></td><td><%= item.workers%></td><td><%= item.average_pending_time%></td><td><span><%= item.total.tasks%></span><span>/</span><span><%= item.total.groups%><span></td><td><%= item.finished_tasks%></td><td><%= item.processing_tasks%></td><td><%= item.pending_tasks%></td><td><%= item.failed_tasks%></td><td><span><%= item.blocked.tasks%></span><span>/</span><span><%= item.blocked.groups%><span></td><td><button class="btn_reschedule">Schedule</button></td><td><button class="btn_clear">Clear</button></td>', '</tr>', '<%})%>', '<tr>', '<td>Total</td><td><%= _.reduce(data, function(memo, item){ return memo + Number(item.workers); }, 0)%></td><td><%= _.reduce(data, function(memo, item){ return memo + Number(item.average_pending_time); }, 0)%></td><td><span><%= _.reduce(data, function(memo, item){ return memo + Number(item.total.tasks); }, 0)%></span><span>/</span><span><%= _.reduce(data, function(memo, item){ return memo + Number(item.total.groups); }, 0) %></span></td><td><%= _.reduce(data, function(memo, item){ return memo + Number(item.finished_tasks); }, 0)%></td><td><%= _.reduce(data, function(memo, item){ return memo + Number(item.processing_tasks); }, 0)%></td><td><%= _.reduce(data, function(memo, item){ return memo + Number(item.pending_tasks); }, 0)%></td><td><%= _.reduce(data, function(memo, item){ return memo + Number(item.failed_tasks); }, 0)%></td><td><span><%= _.reduce(data, function(memo, item){ return memo + Number(item.blocked.tasks); }, 0)%></span><span>/</span><span><%= _.reduce(data, function(memo, item){ return memo + Number(item.blocked.groups); }, 0)%></span></td><td>&nbsp;</td><td>&nbsp;</td>', '</tr>', '</tbody>', '</table>'];
-
- statistics = [];
-
- select_index = 0;
-
- timer_id = 0;
-
- init = function() {
- console.log((new Date).toString());
- return $.ajax({
- type: 'GET',
- url: '/api/queues/statistics',
- success: function(data) {
- var name, select_value;
- statistics = data;
- $('#m_statistics').html(_.template(arr.join(''), {
- data: data
- }));
- if ($('#queque_detail').is(":visible")) {
- $($('#m_statistics').find('tr')[select_index]).attr("id", "active");
- name = $($($('#m_statistics').find('tr')[select_index]).find('td:first')).html();
- detail_bind(name);
- }
- select_value = $("select").find("option:selected").text();
- return timer_id = setTimeout((function() {
- return init();
- }), select_value.substring(0, select_value.length - 1) * 1000);
- }
- });
- };
-
- $(document).ready(function() {
- $('select').find("option:nth-child(1)").attr("selected", "true");
- $('#queque_detail').hide();
- init();
- return bind();
- });
-
- bind = function() {
- $('#m_statistics tr[key=mykey]').live('click', function() {
- var name;
- $($('#m_statistics').find('tr')[select_index]).removeAttr('id');
- $(this).attr("id", "active");
- name = $($(this).find('td')[0]).html();
- select_index = $(this).parent().index();
- detail_bind(name);
- return $('#queque_detail').show();
- });
- return ['reschedule', 'clear'].map(function(command) {
- return (function(command) {
- return $("#m_statistics .btn_" + command).live('click', function(event) {
- var name, that;
- event.stopPropagation();
- name = $(this).parent().parent().find('td:first').html();
- that = this;
- return $.ajax({
- type: 'POST',
- url: '/api/queues/' + name + ("/" + command),
- success: function(result) {
- var index;
- index = $(that).parent().parent().index();
- $(that).parent().parent().html(_.template(arr[5], {
- item: result
- }));
- statistics[index] = result;
- return $('#m_statistics tr:last').html(_.template(arr[9], {
- data: statistics
- }));
- }
- });
- });
- })(command);
- });
- };
-
- detail_bind = function(name) {
- return ['statistics', 'recently_finished_tasks', 'failed_tasks', 'slowest_tasks', 'processing_tasks', 'workers'].map(function(command) {
- return (function(command) {
- return $.ajax({
- type: 'GET',
- url: '/api/queues/' + name + ("/" + command),
- success: function(results) {
- var param;
- param = {};
- param[command] = results;
- $("#s_" + command).html(_.template($("#tb_" + command + "_template").html(), param));
- if (command === 'failed_tasks') {
- $(".failed_popover").find(".nav-tabs>li:first").addClass("active");
- return $(".failed_popover").find(".tab-content>div:first").addClass("active");
- }
- }
- });
- })(command);
- });
- };
-
- $("select").change(function() {
- clearTimeout(timer_id);
- return init();
- });
-
- $('.icon-th').click(function() {
- $('#workers + .tabbable').addClass('xz');
- $(this).addClass('active');
- return $('.icon-th-large').removeClass('active');
- });
-
- $('.icon-th-large').click(function() {
- $('#workers + .tabbable').removeClass('xz');
- $(this).addClass('active');
- return $('.icon-th').removeClass('active');
- });
-
- $(document).scroll(function() {
- var scroll_top;
- scroll_top = $(document).scrollTop();
- if (scroll_top > 40) {
- return $('h1').addClass("h1_shadow");
- } else {
- return $('h1').removeClass("h1_shadow");
- }
- });
-
- this.parse_milliseconds = function(milli) {
- var second;
- second = milli / 1000;
- if (second < 1) {
- return milli + 'ms';
- }
- if ((1 < second && second < 60)) {
- return Math.floor(second) + 's';
- }
- if (second > 60) {
- return Math.floor(second / 60) + 'm' + ':' + Math.floor(second % 60) + 's';
- }
- };
-
- this.id_factory = function() {
- var i;
- i = 0;
- return {
- "new": function() {
- return i++;
- }
- };
- };
-
-}).call(this);
View
218 src/fairy.coffee
@@ -107,21 +107,27 @@ enter_cleanup_mode = ->
else
return process.exit()
-# When `SIGINT` (e.g. `Control-C`), `SIGHUP` or `SIGUSR2` is received,
-# gracefully exit by notifying all workers entering cleanup mode and exit after
-# all cleaned up.
+# When below signals are captured, gracefully exit the program by notifying all
+# workers entering cleanup mode and exit after all are cleaned up.
+#
+# + `SIGINT` (e.g. `Control-C`)
+# + `SIGHUP`
+# + `SIGQUIT`
+# + `SIGUSR1`
+# + `SIGUSR2`
+# + `SIGTERM`
+# + `SIGABRT`
process.on 'SIGINT', enter_cleanup_mode
process.on 'SIGHUP', enter_cleanup_mode
process.on 'SIGQUIT', enter_cleanup_mode
-process.on 'SIGSTOP', enter_cleanup_mode
process.on 'SIGUSR1', enter_cleanup_mode
process.on 'SIGUSR2', enter_cleanup_mode
process.on 'SIGTERM', enter_cleanup_mode
process.on 'SIGABRT', enter_cleanup_mode
-# When `uncaughtException` captured, **Fairy** can not tell if this is caught by
-# the handling function, as well as which queue cause the exception. **Fairy**
-# will fail all processing tasks and block the according group.
+# When `uncaughtException` is captured, **Fairy** can not tell if this is caught
+# by the handling function, as well as which queue cause the exception.
+# **Fairy** will fail all processing tasks and block the according group.
process.on 'uncaughtException', (err) ->
console.log 'Exception:', err.stack
console.log 'Fairy workers will block their processing groups before exit.' if registered_workers.length
@@ -131,8 +137,10 @@ process.on 'uncaughtException', (err) ->
process.on 'exit', ->
console.log "Fairy cleaned up, exiting..." if cleanup_required
+
# ## Helper Methods
+
# ### Get Public IP
#
# **Fairy** embed public IP address of workers' environment in workers' name to
@@ -143,6 +151,7 @@ server_ip = ->
return address.address if not address.internal and address.family is 'IPv4'
return 'UNKNOWN_IP'
+
# ## Create Redis Client
create_client = (options) ->
client = redis.createClient options.port, options.host, options.options
@@ -237,6 +246,7 @@ class Fairy
result[i] = statistics
callback null, result if callback unless --total_queues
+
# ## Class Queue
# Objects of class `Queue` handles:
@@ -261,6 +271,7 @@ class Fairy
# queues = fairy.queues()
class Queue
+
# ### Constructor
# The constructor of class `Queue` stores the Redis connection and the name
@@ -268,6 +279,7 @@ class Queue
constructor: (@fairy, @name) ->
@redis = fairy.redis
+
# ### Function to Resolve Key Name
# **Private** method to generate (`FAIRY`) prefixed and (queue name) suffixed keys. Keys
@@ -286,22 +298,24 @@ class Queue
# + `STATISTICS`, Redis hash, tracks basic statistics for the queue.
key: (key) -> "#{prefix}:#{key}:#{@name}"
+
# ### Configurable Parameters
#
# Prototypal inherited parameters which can be overriden by instance
# properties include:
# + Polling interval in milliseconds
- # + Retry interval in milliseconds
# + Maximum times of retries
+ # + Retry interval in milliseconds
# + Storage capacity for newly finished tasks
# + Storage capacity for slowest tasks
polling_interval : 5
- retry_delay : 0.1 * 1000
retry_limit : 2
+ retry_delay : 0.1 * 1000
recent_size : 10
slowest_size : 10
+
# ### Placing Tasks
# Tasks will be pushed into `SOURCE` Redis lists:
@@ -357,9 +371,9 @@ class Queue
@_try_exit()
process.on 'exit', => @redis.hdel @key('WORKERS'), worker_id
-
@_poll()
+
# ### Poll New Task
# **Private** method. If any task presents in the `SOURCE` list, `lpop` from
@@ -392,6 +406,7 @@ class Queue
@redis.unwatch()
setTimeout @_poll, @polling_interval
+
# ### Exit When All Queues are Cleaned Up
# **Private** method. Wait if there're queues still working, or exit the
@@ -401,6 +416,7 @@ class Queue
process.exit() unless registered_workers.length
log_registered_workers()
+
# ### Process Each Group's First Task
# **Private** method. The real job is done by the passed in `handler` of
@@ -445,26 +461,26 @@ class Queue
errors.push err.message or null
switch err.do
when 'block'
- multi = @redis.multi()
- multi.rpush @key('FAILED'), JSON.stringify([task..., Date.now(), errors])
- multi.hdel @key('PROCESSING'), processing
- multi.sadd @key('BLOCKED'), task[1]
- multi.exec()
+ @redis.multi()
+ .rpush(@key('FAILED'), JSON.stringify([task..., Date.now(), errors]))
+ .hdel(@key('PROCESSING'), processing)
+ .sadd(@key('BLOCKED'), task[1])
+ .exec()
return @_poll()
when 'block-after-retry'
return setTimeout call_handler, @retry_delay if retry_count--
- multi = @redis.multi()
- multi.rpush @key('FAILED'), JSON.stringify([task..., Date.now(), errors])
- multi.hdel @key('PROCESSING'), processing
- multi.sadd @key('BLOCKED'), task[1]
- multi.exec()
+ @redis.multi()
+ .rpush(@key('FAILED'), JSON.stringify([task..., Date.now(), errors]))
+ .hdel(@key('PROCESSING'), processing)
+ .sadd(@key('BLOCKED'), task[1])
+ .exec()
return @_poll()
else
return setTimeout call_handler, @retry_delay if retry_count--
- multi = @redis.multi()
- multi.rpush @key('FAILED'), JSON.stringify([task..., Date.now(), errors])
- multi.hdel @key('PROCESSING'), processing
- multi.exec()
+ @redis.multi()
+ .rpush(@key('FAILED'), JSON.stringify([task..., Date.now(), errors]))
+ .hdel(@key('PROCESSING'), processing)
+ .exec()
# Success handling routine:
#
@@ -476,18 +492,18 @@ class Queue
# 4. Track tasks take the longest processing time in `SLOWEST` sorted
# set.
else
- multi = @redis.multi()
- multi.hdel @key('PROCESSING'), processing
finish_time = Date.now()
process_time = finish_time - start_time
- multi.hincrby @key('STATISTICS'), 'finished', 1
- multi.hincrby @key('STATISTICS'), 'total_pending_time', start_time - task[task.length - 1]
- multi.hincrby @key('STATISTICS'), 'total_process_time', process_time
- multi.lpush @key('RECENT'), JSON.stringify([task..., finish_time])
- multi.ltrim @key('RECENT'), 0, @recent_size - 1
- multi.zadd @key('SLOWEST'), process_time, JSON.stringify([task..., start_time])
- multi.zremrangebyrank @key('SLOWEST'), 0, - @slowest_size - 1
- multi.exec()
+ @redis.multi()
+ .hdel(@key('PROCESSING'), processing)
+ .hincrby(@key('STATISTICS'), 'FINISHED', 1)
+ .hincrby(@key('STATISTICS'), 'total_pending_time', start_time - task[task.length - 1])
+ .hincrby(@key('STATISTICS'), 'total_process_time', process_time)
+ .lpush(@key('RECENT'), JSON.stringify([task..., finish_time]))
+ .ltrim(@key('RECENT'), 0, @recent_size - 1)
+ .zadd(@key('SLOWEST'), process_time, JSON.stringify([task..., start_time]))
+ .zremrangebyrank(@key('SLOWEST'), 0, - @slowest_size - 1)
+ .exec()
@_continue_group task[1]
@@ -585,8 +601,8 @@ class Queue
@failed_tasks (err, tasks) =>
requeued_tasks = []
requeued_tasks.push tasks.map((task) -> JSON.stringify [task.id, task.params..., task.queued.valueOf()])...
- @blocked_groups (err, groups) =>
+ @blocked_groups (err, groups) =>
# Make sure all blocked `QUEUED` list are not touched when you
# reschedule tasks in them. Then, start the transaction as:
#
@@ -609,12 +625,8 @@ class Queue
client.quit()
return callback multi_err
if multi_res
- client.del @key('RESCHEDULE')
client.quit()
@statistics callback
- #=>
- # @rescheduling = off
- # callback.apply @, arguments if callback
else
reschedule callback
@@ -863,18 +875,22 @@ class Queue
# ### Clear A Queue
- # Remove **all** tasks of the queue, and reset statistics.
+ # Remove **all** tasks of the queue, and reset statistics. Set `TOTAL` to
+ # `PROCESSING` tasks to prevent negative pending tasks being calculated.
clear: (callback) =>
@redis.watch @key('SOURCE')
- @redis.keys "#{@key('QUEUED')}:*", (err, res) =>
- return callback err if err
- multi = @redis.multi()
- multi.del @key('GROUPS'), @key('RECENT'), @key('FAILED'), @key('SOURCE'), @key('STATISTICS'), @key('SLOWEST'), @key('BLOCKED'), res...
- multi.hmset @key('STATISTICS'), 'TOTAL', 0, 'finished', 0, 'total_pending_time', 0, 'total_process_time', 0
- multi.exec (err, res) =>
+ @redis.watch @key('PROCESSING')
+ @redis.hlen @key('PROCESSING'), (err, processing) =>
+ return callback? err if err
+ @redis.keys "#{@key('QUEUED')}:*", (err, res) =>
return callback? err if err
- return @clear callback unless res
- @statistics callback if callback
+ @redis.multi()
+ .del(@key('GROUPS'), @key('RECENT'), @key('FAILED'), @key('SOURCE'), @key('STATISTICS'), @key('SLOWEST'), @key('BLOCKED'), res...)
+ .hmset(@key('STATISTICS'), 'TOTAL', processing, 'FINISHED', 0, 'TOTAL_PENDING_TIME', 0, 'TOTAL_PROCESS_TIME', 0)
+ .exec (err, res) =>
+ return callback? err if err
+ return @clear callback unless res
+ @statistics callback if callback
# ### Get Statistics of a Queue Asynchronously
@@ -934,59 +950,59 @@ class Queue
# 4. Count failed task -- `LLEN` of `FAILED` list.
# 5. Get identifiers of blocked group -- `SMEMBERS` of `BLOCKED` set.
# 6. Count **live** workers of this queue -- `HLEN` of `WORKERS`.
- multi = @redis.multi()
- multi.scard @key('GROUPS')
- multi.hgetall @key('STATISTICS')
- multi.hlen @key('PROCESSING')
- multi.llen @key('FAILED')
- multi.smembers @key('BLOCKED')
- multi.hlen @key('WORKERS')
- multi.exec (multi_err, multi_res) =>
- return callback multi_err if multi_err
-
- # Process the result of the transaction.
- #
- # 1. Assign transaction results to result object, and:
- # 2. Convert:
- # - `total_pending_time` into `average_pending_time`, and:
- # - `total_process_time` into `average_process_time`
- # 3. Calibrate initial condition (in case of no task is finished).
- statistics = multi_res[1] or {}
- result =
- name: @name
- total:
- groups: multi_res[0]
- tasks: parseInt(statistics.TOTAL) or 0
- finished_tasks: parseInt(statistics.finished) or 0
- average_pending_time: Math.round(statistics.total_pending_time * 100 / statistics.finished) / 100
- average_process_time: Math.round(statistics.total_process_time * 100 / statistics.finished) / 100
- blocked:
- groups: multi_res[4].length
- processing_tasks: multi_res[2]
- failed_tasks: multi_res[3]
- workers: multi_res[5]
- if result.finished_tasks is 0
- result.average_pending_time = '-'
- result.average_process_time = '-'
-
- # Calculate blocked and pending tasks:
- #
- # 1. Initiate another transaction to count all `BLOCKED` tasks. Blocked
- # tasks are tasks in the `QUEUED` lists whose group identifiers are in
- # the `BLOCKED` set. **Note:** The leftmost task of each `QUEUED` list
- # will not be counted, since that's the causing (failed) task.
- # 2. Calculate pending tasks.
- #
- # The equation used to calculate pending tasks is:
- #
- # pending = total - finished - processing - failed - blocked
- multi2 = @redis.multi()
- multi2.llen "#{@key('QUEUED')}:#{group}" for group in multi_res[4]
- multi2.exec (multi2_err, multi2_res) ->
- return callback multi2_err if multi2_err
- result.blocked.tasks = multi2_res.reduce(((a, b) -> a + b), - result.blocked.groups)
- result.pending_tasks = result.total.tasks - result.finished_tasks - result.processing_tasks - result.failed_tasks - result.blocked.tasks
- callback null, result
+ @redis.multi()
+ .scard(@key('GROUPS'))
+ .hgetall(@key('STATISTICS'))
+ .hlen(@key('PROCESSING'))
+ .llen(@key('FAILED'))
+ .smembers(@key('BLOCKED'))
+ .hlen(@key('WORKERS'))
+ .exec (multi_err, multi_res) =>
+ return callback multi_err if multi_err
+
+ # Process the result of the transaction.
+ #
+ # 1. Assign transaction results to result object, and:
+ # 2. Convert:
+ # - `total_pending_time` into `average_pending_time`, and:
+ # - `total_process_time` into `average_process_time`
+ # 3. Calibrate initial condition (in case of no task is finished).
+ statistics = multi_res[1] or {}
+ result =
+ name: @name
+ total:
+ groups: multi_res[0]
+ tasks: parseInt(statistics.TOTAL) or 0
+ finished_tasks: parseInt(statistics.FINISHED) or 0
+ average_pending_time: Math.round(statistics.TOTAL_PENDING_TIME * 100 / statistics.FINISHED) / 100
+ average_process_time: Math.round(statistics.TOTAL_PROCESS_TIME * 100 / statistics.FINISHED) / 100
+ blocked:
+ groups: multi_res[4].length
+ processing_tasks: multi_res[2]
+ failed_tasks: multi_res[3]
+ workers: multi_res[5]
+ if result.finished_tasks is 0
+ result.average_pending_time = '-'
+ result.average_process_time = '-'
+
+ # Calculate blocked and pending tasks:
+ #
+ # 1. Initiate another transaction to count all `BLOCKED` tasks. Blocked
+ # tasks are tasks in the `QUEUED` lists whose group identifiers are in
+ # the `BLOCKED` set. **Note:** The leftmost task of each `QUEUED` list
+ # will not be counted, since that's the causing (failed) task.
+ # 2. Calculate pending tasks.
+ #
+ # The equation used to calculate pending tasks is:
+ #
+ # pending = total - finished - processing - failed - blocked
+ multi2 = @redis.multi()
+ multi2.llen "#{@key('QUEUED')}:#{group}" for group in multi_res[4]
+ multi2.exec (multi2_err, multi2_res) ->
+ return callback multi2_err if multi2_err
+ result.blocked.tasks = multi2_res.reduce(((a, b) -> a + b), - result.blocked.groups)
+ result.pending_tasks = result.total.tasks - result.finished_tasks - result.processing_tasks - result.failed_tasks - result.blocked.tasks
+ callback null, result
# Known Bugs:
View
86 src/server/fairy_active.js
@@ -1,86 +0,0 @@
-//--------------------------Table Crosshair--START----------------------------
-function hoverOver()
-{
- this.parentNode.className = "hoverRow";
-
- var rowElements = this.parentNode.parentNode.childNodes;
- // Check in which column the this cell object is at the moment.
- var column = 0;
- var o = this;
- while (o = o.previousSibling) column++;
- for (var row = 0; row < rowElements.length; row++)
- {
- if (rowElements[row].nodeType != 1) continue;
- rowElements[row].childNodes[column].className = "hoverColumn";
- }
-
- this.className = "hoverCell";
-}
-
-function hoverOut()
-{
- this.parentNode.className = "";
-
- var rowElements = this.parentNode.parentNode.childNodes;
- // Check in which column the this cell object is at the moment.
- var column = 0;
- var o = this;
- while (o = o.previousSibling) column++;
- for (var row = 0; row < rowElements.length; row++)
- {
- if (rowElements[row].nodeType != 1) continue;
- rowElements[row].childNodes[column].className = "";
- }
-}
-
-function init()
-{
- var rowElements = document.getElementsByTagName("tr");
- for (var row = 0; row < rowElements.length; row++)
- {
- columnElements = rowElements[row].childNodes;
- for (var column = 0; column < columnElements.length; column++)
- {
- columnElements[column].onmouseover = hoverOver;
- columnElements[column].onmouseout = hoverOut;
- }
- }
-}
-
-window.onload=init;
-//--------------------------Table Crosshair--end----------------------------
-
-
-//点击图标切换 统计 下显示表格的方式
-$('.icon-th').click( function(){
-
- $('#workers + .tabbable').addClass('xz');
- $(this).addClass('active');
- $('.icon-th-large').removeClass('active');
-})
-$('.icon-th-large').click( function(){
-
- $('#workers + .tabbable').removeClass('xz');
- $(this).addClass('active');
- $('.icon-th').removeClass('active');
-
-})
-
-//顶部阴影
-$(document).scroll(function(){
- var scroll_top = $(document).scrollTop();
- if(scroll_top>40){
- $('h1').addClass("h1_shadow");
- }
- else{
- $('h1').removeClass("h1_shadow");
- }
-})
-
-
-
-
-
-
-
-
View
2  test/1.test.coffee
@@ -24,7 +24,7 @@ describe ["Process #{total_tasks} Tasks of #{total_groups} Groups by #{total_wor
do reschedule = ->
queue.reschedule (err, statistics) ->
- setTimeout reschedule, 100
+ setTimeout reschedule, 100
wait_until_done queue, total_tasks, done
View
40 test/shared_steps.coffee
@@ -2,22 +2,25 @@ fs = require 'fs'
{exec} = require 'child_process'
should = require 'should'
-allowed_signals = ['SIGINT', 'SIGHUP', 'SIGUSR1', 'SIGUSR2', 'SIGTERM', 'SIGQUIT', 'SIGTERM', 'SIGABRT']
-random_signal = -> allowed_signals[parseInt Math.random() * allowed_signals.length]
+Array::random = -> @[parseInt Math.random() * @length]
-exports = module.exports =
+soft_kill_signals = [
+ 'SIGINT'
+ 'SIGHUP'
+ 'SIGQUIT'
+ 'SIGUSR1'
+ 'SIGUSR2'
+ 'SIGTERM'
+ 'SIGABRT'
+]
- kill_one: (queue, done) ->
- queue.workers (err, workers) ->
- return done() unless workers.length
- process.kill workers[parseInt Math.random() * workers.length].pid, random_signal()
- done()
+exports = module.exports =
clear_queue: (queue, done) ->
queue.clear (err, statistics) ->
should.not.exist err
- statistics.total.groups.should.equal 0
- statistics.total.tasks.should.equal 0
+ statistics.total.groups.should.equal 0
+ statistics.total.tasks.should.equal 0
statistics.pending_tasks.should.equal 0
done()
@@ -36,28 +39,35 @@ exports = module.exports =
sequence = group_sequence[group]++
queue.enqueue group, sequence, generate
+ kill_one: (queue, done) ->
+ queue.workers (err, workers) ->
+ return done() unless workers.length
+ process.kill workers.random().pid, soft_kill_signals.random()
+ done()
+
wait_until_done: (queue, total_tasks, done) ->
+ success_counter = 0
do probe = ->
queue.statistics (err, statistics) ->
if statistics.finished_tasks is total_tasks
statistics.pending_tasks.should.equal 0
statistics.processing_tasks.should.equal 0
- return done()
+ return done() if success_counter++ is 3
setTimeout probe, 100
clean_up: (queue, done) ->
- checked_times = 0
+ success_counter = 0
queue.workers (err, workers) ->
- process.kill worker.pid, random_signal() for worker in workers
+ process.kill worker.pid, soft_kill_signals.random() for worker in workers
do get_statistics = ->
queue.statistics (err, statistics) ->
return setTimeout get_statistics, 100 unless statistics.workers is 0
- return setTimeout get_statistics, 100 unless checked_times++ is 3
+ return setTimeout get_statistics, 100 unless success_counter++ is 3
statistics.pending_tasks.should.equal 0
done()
check_result: (total_groups, done) ->
for group in [0 .. total_groups - 1]
- for content, line in fs.readFileSync("#{__dirname}/workers/#{group}.dmp").toString().split('\n')[0..-2]
+ for content, line in fs.readFileSync("#{__dirname}/workers/#{group}.dmp").toString().split('\n')[0...-1]
content.should.equal "#{line}"
exec "rm -f #{__dirname}/workers/*.dmp", done
Please sign in to comment.
Something went wrong with that request. Please try again.