Skip to content

Commit

Permalink
resolved conflict in a simple_skiplist
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Andreev committed Apr 28, 2008
2 parents 23704a1 + 8ff6a4c commit 61f8140
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 48 deletions.
131 changes: 92 additions & 39 deletions lib/strokedb/data_structures/simple_skiplist.rb
Expand Up @@ -16,10 +16,17 @@ def initialize(raw_list = nil, options = {})
options = options.stringify_keys
@maxlevel = options['maxlevel'] || DEFAULT_MAXLEVEL
@probability = options['probability'] || DEFAULT_PROBABILITY
@head = raw_list && unserialize_list!(raw_list) || new_head
@head, @tail = new_anchors(@maxlevel)
if raw_list
@head, @tail = unserialize_list!(raw_list)
end
@mutex = Mutex.new
end

def inspect
"#<#{self.class}:0x#{object_id.to_s(16)} items: #{to_a.inspect}, maxlevel: #{@maxlevel}, probability: #{@probability}>"
end

# Marshal API
def marshal_dump
raw_list = serialize_list(@head)
Expand All @@ -40,12 +47,11 @@ def marshal_load(dumped)
# Tests whether skiplist is empty.
#
def empty?
!node_next(@head, 0)
node_next(@head, 0) == @tail
end

# Complicated search algorithm
# TODO: add reverse support
# TODO: add key duplication support
#
def search(start_key, end_key, limit, offset, reverse, with_keys)
offset ||= 0
Expand All @@ -63,31 +69,32 @@ def search(start_key, end_key, limit, offset, reverse, with_keys)
#
def find_by_prefix(start_key, reverse)
# TODO: add reverse support
x = node_first # head [FIXME: change method name]
dir = dir_for_reverse(reverse)
x = anchor(reverse) # head [FIXME: change method name]
# if no prefix given, just return a first node
!start_key and return node_next(x, 0)
!start_key and return node_next(x, 0, dir)

level = node_level(x)
while level > 0
level -= 1
xnext = node_next(x, level)
xnext = node_next(x, level, dir)
while node_compare(xnext, start_key) < 0
x = xnext
xnext = node_next(x, level)
xnext = node_next(x, level, dir)
end
end
xnext == @tail and return nil
xnext == anchor(!reverse) and return nil
node_key(xnext)[0, start_key.size] != start_key and return nil
xnext
end

#
#
def skip_nodes(node, offset, reverse)
# TODO: add reverse support
dir = dir_for_reverse(reverse)
tail = @tail
while offset > 0 && node != tail
node = node_next(node, 0)
node = node_next(node, 0, dir)
offset -= 1
end
offset <= 0 ? node : nil
Expand All @@ -96,18 +103,18 @@ def skip_nodes(node, offset, reverse)
#
#
def collect_values(x, end_prefix, limit, reverse, with_keys)
# TODO: add reverse support
dir = dir_for_reverse(reverse)
values = []
meth = method(with_keys ? :node_pair : :node_value)
tail = @tail
tail = anchor(!reverse)
limit ||= Float::MAX
end_prefix ||= ""
pfx_size = end_prefix.size
while x != tail
node_key(x)[0, pfx_size] > end_prefix and return values
values.size >= limit and return values
values << meth.call(x).freeze
x = node_next(x, 0)
x = node_next(x, 0, dir)
end
values
end
Expand All @@ -122,10 +129,18 @@ def first_key
# Insert a key-value pair. If key already exists,
# value will be overwritten.
#
# <i> is a new node
# <M> is a marked node in a update_list
# <N> is next node to <M> which reference must be updated.
#
# M-----------------> i <---------------- N ...
# o ------> M ------> i <------ N ...
# o -> o -> o -> M -> i <- N ....
#
def insert(key, value, __level = nil)
@mutex.synchronize do
newlevel = __level || random_level
x = node_first
x = anchor
level = node_level(x)
update = Array.new(level)
x = find_with_update(x, level, key, update)
Expand Down Expand Up @@ -161,7 +176,7 @@ def find_with_update(x, level, key, update) #:nodoc:

# Find is thread-safe and requires no mutexes locking.
def find_nearest_node(key) #:nodoc:
x = node_first
x = anchor
level = node_level(x)
while level > 0
level -= 1
Expand Down Expand Up @@ -199,13 +214,15 @@ def find_nearest_node(key) #:nodoc:
# }
# end
end
declare_optimized_methods(:C) do
end

declare_optimized_methods(:C, :find_nearest_node, :find_with_update) do
declare_optimized_methods(:X, :find_nearest_node, :find_with_update) do
require 'rubygems'
require 'inline'
inline(:C) do |builder|
builder.prefix %{
static ID i_node_first, i_node_level;
static ID i_anchor, i_node_level;
#define SS_NODE_NEXT(x, level) (rb_ary_entry(rb_ary_entry(x, 0), level))
static int ss_node_compare(VALUE x, VALUE key)
{
Expand All @@ -216,13 +233,13 @@ def find_nearest_node(key) #:nodoc:
}
}
builder.add_to_init %{
i_node_first = rb_intern("node_first");
i_anchor = rb_intern("anchor");
i_node_level = rb_intern("node_level");
}
builder.c %{
VALUE find_nearest_node_C(VALUE key)
{
VALUE x = rb_funcall(self, i_node_first, 0);
VALUE x = rb_funcall(self, i_anchor, 0);
long level = FIX2LONG(rb_funcall(self, i_node_level, 1, x));
VALUE xnext;
while (level-- > 0)
Expand Down Expand Up @@ -275,8 +292,9 @@ def find(key)
end

def each_node #:nodoc:
x = node_next(node_first, 0)
while x
x = node_next(anchor, 0)
tail = @tail
while x != tail
yield(x)
x = node_next(x, 0)
end
Expand Down Expand Up @@ -315,11 +333,11 @@ def to_a
arr
end
end

private

def serialize_list(head)
head = node_first.dup
head = anchor.dup
head[0] = [ nil ] * node_level(head)
raw_list = [ head ]
prev_by_levels = [ head ] * node_level(head)
Expand All @@ -343,9 +361,12 @@ def serialize_list(head)
raw_list
end

# Returns head of an imported skiplist.
# Returns head & tail of an imported skiplist.
# Caution: raw_list is modified (thus the bang).
# Pass dup-ed value if you need.
#
# TODO: add double-linking!
#
def unserialize_list!(raw_list)
x = raw_list[0]
while x != nil
Expand All @@ -356,26 +377,26 @@ def unserialize_list!(raw_list)
# go next node
x = forwards[0]
end
# return head
raw_list[0]
# return anchors (head, tail)
[raw_list[0], new_anchor] # <- FIXME: normal tail anchor needed
end

# C-style API for node operations
def node_first
@head
def anchor(reverse = false)
reverse ? @tail : @head
end

def node_level(x)
x[0].size
end

def node_next(x, level)
x[0][level]
def node_next(x, level, dir = 0)
x[dir][level]
end

def node_compare(x, key)
return 1 unless x # tail
return -1 unless x[1] # head
return 1 if x == @tail # tail
return -1 if x == @head # head
x[1] <=> key
end

Expand All @@ -395,23 +416,55 @@ def node_set_value!(x, value)
x[2] = value
end

# before:
# prev -> next
# prev <- next
#
# after:
#
# prev -> new -> next
# prev <- new <- next
#
def node_insert_after!(x, prev, level)
x[0][level] = prev[0][level]
netx = node_next(prev, level) # 'next' is a reserved word in ruby

# forward links
x[0][level] = netx
prev[0][level] = x

# backward links
x[3][level] = prev
netx[3][level] = x
end

def new_node(level, key, value)
[
[nil]*level,
key,
value
[
[nil]*level,
key,
value,
[nil]*level
]
end

def new_head
# TODO: drop this after serialization routines updated
def new_anchor
new_node(@maxlevel, nil, nil)
end


def new_anchors(level)
h = new_node(level, nil, nil)
t = new_node(level, nil, nil)
level.times do |i|
h[0][i] = t
t[3][i] = h
end
[h, t]
end

def dir_for_reverse(reverse)
reverse ? 3 : 0
end

def random_level
p = @probability
m = @maxlevel
Expand Down
14 changes: 9 additions & 5 deletions lib/strokedb/volumes/skiplist_volume.rb
Expand Up @@ -120,11 +120,10 @@ def insert(key, value, __level = nil)
write_log(key, value, __level)
@list.insert(key, value, __level)
dump! if @log_bytes > @max_log_size
self
rescue => e
crash!(e)
raise
ensure
self
end

# Volume operations
Expand All @@ -133,6 +132,7 @@ def insert(key, value, __level = nil)
# Read-only access remains.
def close!
dump!
self
class <<self
alias :insert :raise_volume_closed
alias :close! :raise_volume_crashed
Expand All @@ -157,8 +157,8 @@ def dump!
File.rename(@log_path, @log_tmppath)
File.delete(@log_tmppath)

init_log_file(@log_path)

@log_file = init_log_file(@log_path)
self
rescue => e
error "Dump failed!"
crash!(e)
Expand All @@ -183,6 +183,7 @@ class <<self
class LogFormatError < StandardError; end
class VolumeClosedException < StandardError; end
class VolumeCrashedException < StandardError; end
class MessageTooBig < StandardError; end

private

Expand All @@ -196,7 +197,7 @@ def replay_log!(log_path, list)
msg_range = (0..-(1 + checksum_length))

File.open(@log_path, "r") do |f|
msg_length = f.read(4).unpack(nf).first
msg_length = f.read(4).unpack(nf).first rescue nil
(!msg_length || msg_length > max_msg_length) and raise LogFormatError, "Wrong WAL message length prefix!"

msg_chk = f.read(msg_length + checksum_length)
Expand Down Expand Up @@ -231,6 +232,9 @@ def replay_log!(log_path, list)

def write_log(key, value, level)
msg = Marshal.dump([key, value, level])
if msg.size > MAX_LOG_MSG_LENGTH
raise MessageTooBig, "Key-value pair is too big to be inserted (limit is #{MAX_LOG_MSG_LENGTH} bytes)"
end
digest = Digest::MD5.digest(msg)
@log_file.write([msg.size].pack(N_F))
@log_file.write(msg)
Expand Down
13 changes: 11 additions & 2 deletions spec/lib/strokedb/data_structures/simple_skiplist_spec.rb
Expand Up @@ -38,7 +38,7 @@
:maxlevel => @maxlevel
},
:raw_list => [
[[nil]*@maxlevel, nil, nil]
[[nil]*@maxlevel, nil, nil, [nil]*@maxlevel]
]
}
end
Expand Down Expand Up @@ -77,7 +77,10 @@
it "should correctly insert keys in an ascending level order" do
1.upto(@maxlevel) do |i|
k = "x#{i}"
@list.insert(k, k, i).should == @list
r = @list.insert(k, k, i)
r.object_id.should == @list.object_id
r.should == @list

@list.find("").should == nil
@list.find(k).should == k
@list.find("-").should == nil
Expand Down Expand Up @@ -204,6 +207,8 @@
end
end

it_should_behave_like "Skiplist serialization"

it "should find all items" do
search_should_yield(@key_values)
end
Expand Down Expand Up @@ -247,6 +252,10 @@
search_should_yield([], :start_key => "ab1", :end_key => "b")
end

it "should search in a reverse order" do
r = search_with_options(@list, :reverse => true, :with_keys => true)
r.should == @key_values.reverse
end

def search_should_yield(results, os = {})
# TODO: added reverse cases
Expand Down

0 comments on commit 61f8140

Please sign in to comment.