This repository has been archived by the owner on Dec 6, 2018. It is now read-only.
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
fb1a03a
commit e2d27cc
Showing
2 changed files
with
92 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
var shasum = require('shasum') | ||
|
||
//if a job starts, and another is queued before the current job ends, | ||
//delay it, so that the job is only triggered once. | ||
|
||
|
||
module.exports = function (input, jobs, map, work) { | ||
if(!work) work = map, map = function (data) { return data.key } | ||
//create a subsection for the jobs, | ||
//if you don't pass in a separate db, | ||
//create a section inside the input | ||
var pending = {}, running = {} | ||
|
||
if('string' === typeof jobs) | ||
jobs = input.sublevel(jobs) | ||
|
||
var retry = [] | ||
|
||
function doJob (data) { | ||
//don't process deletes! | ||
if(!data.value) return | ||
var hash = shasum(data.value) | ||
|
||
if(!running[hash]) | ||
running[hash] = true | ||
else return | ||
|
||
var done = false | ||
|
||
work(data.value, function (err) { | ||
if(done) return | ||
done = true | ||
if(err) { | ||
running[hash] | ||
return setTimeout(function () { | ||
doJob(data) | ||
}, 500) | ||
} | ||
|
||
jobs.del(data.key, function (err) { | ||
if(err) return retry.push(data) | ||
delete running[hash] | ||
if(pending[hash]) { | ||
delete pending[hash] | ||
doJob(data) | ||
} | ||
}) | ||
}) | ||
} | ||
|
||
input.pre(function (ch, add) { | ||
var key = map(ch) | ||
var hash = shasum(key) | ||
console.log('KEY', key) | ||
if(!pending[hash]) | ||
add({key: Date.now(), value: key, type: 'put'}, jobs) | ||
else | ||
pending[hash] = (0 || pending[hash]) + 1 | ||
}) | ||
|
||
jobs.createReadStream().on('data', doJob) | ||
jobs.post(doJob) | ||
|
||
return jobs | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
|
||
var db = require('../')(require('levelup')('/tmp/whatever')) | ||
var copy = require('../')(require('levelup')('/tmp/whatever2')) | ||
|
||
var makeQueue = require('./queue') | ||
|
||
makeQueue(db, 'jobs', function (key, done) { | ||
console.log("JOB KEY", key) | ||
db.get(key, function (err, value) { | ||
console.log(key, value, err) | ||
value && copy.put(key, value, done) || done() | ||
}) | ||
}) | ||
|
||
db.put('hello' + Date.now(), 'value_' + new Date(), function () { | ||
|
||
setTimeout(function () { | ||
|
||
copy | ||
.createReadStream() | ||
.on('data', console.log) | ||
|
||
}, 1000) | ||
|
||
}) | ||
|