Skip to content

Commit

Permalink
[flow] do not use efn
Browse files Browse the repository at this point in the history
  • Loading branch information
indutny committed Mar 23, 2011
1 parent a44f5fe commit 642286d
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 67 deletions.
49 changes: 34 additions & 15 deletions lib/index/core/compact.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,23 @@ exports.compact = (_callback) ->
process.nextTick ->
_callback and _callback err, data

efn = utils.efn callback

if @lock(=> @compact _callback)
return

iterate = (callback) ->
efn((err, page) ->
(err, page) ->
if err
return @paralell() err

in_leaf = page[0] && page[0][2]
fns = page.map (item) ->
->
step (->
step ->
storage.read item[1], @parallel()
), efn((err, data) ->
, (err, data) ->
if err
return @parallel() err

if in_leaf
# data is actual value
# remove old revision referense
Expand All @@ -59,31 +63,46 @@ exports.compact = (_callback) ->
return

iterate(@parallel()) null, data
), efn((err, new_pos) ->
, (err, new_pos) ->
if err
return @parallel() err

item[1] = new_pos
@parallel() null
), @parallel()
, @parallel()

fns.push (err) ->
if err
return @parallel() err

fns.push(efn (err) ->
storage.write page, @parallel()
)

fns.push callback

step.apply null, fns
)


step (->
step ->
# will allow storage controller
# to prepare it for
storage.beforeCompact && storage.beforeCompact @parallel()
@parallel() null
), efn((err) ->
, (err) ->
if err
return @parallel() err

storage.readRoot iterate @parallel()
), efn((err, new_root_pos) ->
, (err, new_root_pos) ->
if err
return @parallel() err

storage.writeRoot new_root_pos, @parallel()
), efn((err) ->
, (err) ->
if err
return @parallel() err

# @will allow storage to finalize all actions
storage.afterCompact && storage.afterCompact @parallel()
@parallel() null
), callback
, callback

21 changes: 13 additions & 8 deletions lib/index/core/get.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ step = require 'step'
exports.get = (key, callback) ->
sort = @sort
storage = @storage
efn = utils.efn callback

iterate = efn (err, index) ->
iterate = (err, index) ->
if err
return callback err

item_index = utils.search index, sort, key
item = index[item_index]
Expand All @@ -53,7 +54,10 @@ exports.get = (key, callback) ->
return callback 'Not found'

# Read actual value
storage.read value, efn (err, value) ->
storage.read value, (err, value) ->
if err
return callback err

# value = [value, link-to-previous-value]
callback null, value[0]
else
Expand Down Expand Up @@ -81,15 +85,16 @@ exports.traverse = (filter) ->
filter = filter || (kp, callback) -> callback(null, true)

process.nextTick =>
efn = utils.efn (err) ->
promise.emit 'error', err
promise.emit 'end'

sort = @sort
storage = @storage

iterate = (callback) ->
efn (err, page) ->
(err, page) ->
if err
promise.emit 'error', err
promise.emit 'end'
return

index = -1
pagelen = page.length

Expand Down
52 changes: 37 additions & 15 deletions lib/index/core/set.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ exports.set = (key, value, _callback) ->
_callback and _callback err, data


efn = utils.efn callback

iterate = (page, callback) ->
item_index = utils.search page, sort, key
item = page[item_index]
Expand All @@ -60,9 +58,15 @@ exports.set = (key, value, _callback) ->
# Read next page and try to insert kv in it
step ->
storage.read item[1], @parallel()
, efn((err, page) ->
, (err, page) ->
if err
return @parallel() err

iterate page, @parallel()
), efn((err, result) ->
, (err, result) ->
if err
return @parallel() err

if storage.isPosition result
# Page is just should be overwrited
page[item_index][1] = result
Expand All @@ -80,7 +84,7 @@ exports.set = (key, value, _callback) ->
[result.middle_key, result.right_page]

splitPage false, storage, order, page, callback
)

else
# Leaf
step ->
Expand All @@ -91,34 +95,49 @@ exports.set = (key, value, _callback) ->
return

# Invoke conflictManager
step (->
step ->
storage.read item[1], @parallel()
), efn((err, old_value) ->
, (err, old_value) ->
if err
return @parallel() err

@parallel() null, old_value
that.conflictManager old_value, value, @parallel()
), @parallel()
, @parallel()

return

@parallel() null, value
, efn((err, value, old_value) ->
, (err, value, old_value) ->
if err
return @parallel() err

# Value should be firstly written in storage
item_index = if item_index is null then 0 else item_index + 1
storage.write [value, old_value], @parallel()
), efn((err, value) ->
, (err, value) ->
if err
return @parallel() err

# Than inserted in leaf page
page.splice item_index, 0, [key, value, 1]

splitPage true, storage, order, page, callback
)


step ->
# Read initial data
storage.readRoot @parallel()
, efn((err, root) ->
, (err, root) ->
if err
return @parallel() err

# Initiate sequence
iterate root, @parallel()
), efn((err, result) ->
, (err, result) ->
if err
return @parallel() err

if storage.isPosition result
# Write new root
@parallel() null, result
Expand All @@ -128,9 +147,12 @@ exports.set = (key, value, _callback) ->
[null, result.left_page],
[result.middle_key, result.right_page]
], @parallel()
), efn((err, new_root_pos) ->
, (err, new_root_pos) ->
if err
return @parallel() err

storage.writeRoot new_root_pos, @parallel()
), efn(callback)
, callback

###
Check page length
Expand Down
42 changes: 28 additions & 14 deletions lib/index/core/unset.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ exports.unset = (key, _callback) ->
process.nextTick ->
_callback and _callback err, data

efn = utils.efn callback
storage = @storage
sort = @sort

Expand Down Expand Up @@ -67,20 +66,26 @@ exports.unset = (key, _callback) ->
page.splice item_index, 1
if page.length > 0
# If resulting page isn't empty
step (->
step ->
storage.write page, @parallel()
), efn(callback)
, callback
return

# Notify that item should be removed from parent index
callback null, false
else
# Index page
step (->
step ->
storage.read item[1], @parallel()
), efn((err, page) ->
, (err, page) ->
if err
return @parallel() err

iterate page, @parallel()
), efn((err, result) ->
, (err, result) ->
if err
return @parallel() err

if result is false
# Delete item from index page
page.splice item_index, 1
Expand All @@ -93,16 +98,22 @@ exports.unset = (key, _callback) ->
callback null
return

step (->
step ->
storage.write page, @parallel()
), efn(callback)
)
, callback


step (->
step ->
storage.readRoot @parallel()
), efn((err, root) ->
, (err, root) ->
if err
return @parallel() err

iterate root, @parallel()
), efn((err, result) ->
, (err, result) ->
if err
return @parallel() err

if result is false
# Create new root
storage.write [], @parallel()
Expand All @@ -111,10 +122,13 @@ exports.unset = (key, _callback) ->
@parallel() null, result
else
@parallel() null
), efn((err, position) ->
, (err, position) ->
if err
return @parallel() err

if storage.isPosition position
storage.writeRoot position, @parallel()
else
@parallel() null
), callback
, callback

15 changes: 0 additions & 15 deletions lib/index/utils.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,6 @@ utils.search = (index, sort, key) ->
else
null

###
Wrapper for asynchronous callback
###
utils.efn = (callback) ->
(fn) ->
# Callback can be empty
unless fn
fn = -> null

(err) ->
if err
return callback err

fn.apply @, arguments

###
Hash function wrapper
###
Expand Down

0 comments on commit 642286d

Please sign in to comment.