Skip to content

Commit

Permalink
Add dbm support; use dbm rather than tc for vis; remove mandatory tc
Browse files Browse the repository at this point in the history
  • Loading branch information
jhellerstein committed Apr 13, 2011
1 parent 633ac3d commit 56adcb1
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 3 deletions.
3 changes: 3 additions & 0 deletions lib/bud.rb
Expand Up @@ -13,6 +13,7 @@
require 'bud/rtrace'
require 'bud/server'
require 'bud/state'
require 'bud/storage/dbm'
require 'bud/storage/tokyocabinet'
require 'bud/storage/zookeeper'
require 'bud/viz'
Expand Down Expand Up @@ -159,6 +160,7 @@ def initialize(options={})
@rewritten_strata = []
@channels = {}
@tc_tables = {}
@dbm_tables = {}
@zk_tables = {}
@callbacks = {}
@callback_id = 0
Expand Down Expand Up @@ -686,6 +688,7 @@ def do_flush
@channels.each { |c| @tables[c[0]].flush }
@zk_tables.each_value { |t| t.flush }
@tc_tables.each_value { |t| t.flush }
@dbm_tables.each_value { |t| t.flush }
end

def stratum_fixpoint(strat, strat_num)
Expand Down
7 changes: 7 additions & 0 deletions lib/bud/state.rb
Expand Up @@ -98,6 +98,13 @@ def tctable(name, schema=nil)
@tables[name] = Bud::BudTcTable.new(name, self, schema)
@tc_tables[name] = @tables[name]
end

# declare a dbm table
def dbm_table(name, schema=nil)
define_collection(name)
@tables[name] = Bud::BudDbmTable.new(name, self, schema)
@dbm_tables[name] = @tables[name]
end

# declare an Apache ZooKeeper table
def zktable(name, path, addr="localhost:2181")
Expand Down
170 changes: 170 additions & 0 deletions lib/bud/storage/dbm.rb
@@ -0,0 +1,170 @@
require 'dbm'

module Bud
# Persistent table implementation based on TokyoCabinet.
class BudDbmTable < BudCollection # :nodoc: all
def initialize(name, bud_instance, given_schema)
dbm_dir = bud_instance.options[:dbm_dir]
raise BudError, "dbm support must be enabled via 'dbm_dir'" unless dbm_dir
unless File.exists?(dbm_dir)
Dir.mkdir(dbm_dir)
puts "Created directory: #{dbm_dir}" unless bud_instance.options[:quiet]
end

dirname = "#{dbm_dir}/bud_#{bud_instance.port}"
unless File.exists?(dirname)
Dir.mkdir(dirname)
puts "Created directory: #{dirname}" unless bud_instance.options[:quiet]
end

super(name, bud_instance, given_schema)
@to_delete = []

db_fname = "#{dirname}/#{name}.dbm"
flags = DBM::WRCREAT
if bud_instance.options[:dbm_newdb] == true
flags |= DBM::NEWDB
end
@dbm = DBM.open(db_fname, 0666, flags)
if @dbm.nil?
raise BudError, "Failed to open dbm database '#{db_fname}': #{@dbm.errmsg}"
end
end

def init_storage
# XXX: we can't easily use the @storage infrastructure provided by
# BudCollection; issue #33
@storage = nil
end

def [](key)
key_s = MessagePack.pack(key)
val_s = @dbm[key_s]
if val_s
return make_tuple(key, MessagePack.unpack(val_s))
else
return @delta[key]
end
end

def has_key?(k)
key_s = MessagePack.pack(k)
return true if @dbm.has_key? key_s
return @delta.has_key? k
end

def include?(tuple)
key = @key_colnums.map{|k| tuple[k]}
value = self[key]
return (value == tuple)
end

def make_tuple(k_ary, v_ary)
t = Array.new(k_ary.length + v_ary.length)
@key_colnums.each_with_index do |k,i|
t[k] = k_ary[i]
end
val_cols.each_with_index do |c,i|
t[schema.index(c)] = v_ary[i]
end
tuple_accessors(t)
end

def each(&block)
each_from([@delta], &block)
each_storage(&block)
end

def each_from(bufs, &block)
bufs.each do |b|
if b == @storage then
each_storage(&block)
else
b.each_value do |v|
yield v
end
end
end
end

def each_storage(&block)
@dbm.each do |k,v|
k_ary = MessagePack.unpack(k)
v_ary = MessagePack.unpack(v)
yield make_tuple(k_ary, v_ary)
end
end

def flush
end

def close
@dbm.close unless @dbm.nil?
@dbm = nil
end

def merge_to_db(buf)
buf.each do |key,tuple|
merge_tuple(key, tuple)
end
end

def merge_tuple(key, tuple)
val = val_cols.map{|c| tuple[schema.index(c)]}
key_s = MessagePack.pack(key)
val_s = MessagePack.pack(val)
if @dbm.has_key?(key_s)
old_tuple = self[key]
raise_pk_error(tuple, old_tuple) if tuple != old_tuple
else
@dbm[key_s] = val_s
end
end

# move deltas to TC, and new_deltas to deltas
def tick_deltas
merge_to_db(@delta)
@delta = @new_delta
@new_delta = {}
end

superator "<-" do |o|
o.each do |tuple|
@to_delete << tuple unless tuple.nil?
end
end

def insert(tuple)
key = @key_colnums.map{|k| tuple[k]}
merge_tuple(key, tuple)
end

alias << insert

# Remove to_delete and then add pending to db
def tick
@to_delete.each do |tuple|
k = @key_colnums.map{|c| tuple[c]}
k_str = MessagePack.pack(k)
cols_str = @dbm[k_str]
unless cols_str.nil?
db_cols = MessagePack.unpack(cols_str)
delete_cols = val_cols.map{|c| tuple[schema.index(c)]}
if db_cols == delete_cols
@dbm.delete k_str
end
end
end
@to_delete = []

merge_to_db(@pending)
@pending = {}

flush
end

def method_missing(sym, *args, &block)
@dbm.send sym, *args, &block
end
end
end
6 changes: 5 additions & 1 deletion lib/bud/storage/tokyocabinet.rb
@@ -1,4 +1,8 @@
require 'tokyocabinet'
begin
require 'tokyocabinet'
Bud::HAVE_TOKYOCABINET = true
rescue LoadError
end

module Bud
# Persistent table implementation based on TokyoCabinet.
Expand Down
4 changes: 2 additions & 2 deletions lib/bud/viz.rb
Expand Up @@ -9,7 +9,7 @@ def initialize(bud_instance)
@bud_instance = bud_instance
return if bud_instance.class == Stratification or @bud_instance.class == DepAnalysis
@meta_tables = {'t_rules' => 1, 't_depends' => 1, 't_table_info' => 1, 't_cycle' => 1, 't_stratum' => 1, 't_depends_tc' => 1, 't_table_schema' => 1}
@bud_instance.options[:tc_dir] = "TC_#{@bud_instance.class}_#{bud_instance.options[:tag]}_#{bud_instance.object_id}_#{bud_instance.port}"
@bud_instance.options[:dbm_dir] = "DBM_#{@bud_instance.class}_#{bud_instance.options[:tag]}_#{bud_instance.object_id}_#{bud_instance.port}"
@table_info = new_tab(:t_table_info, [:tab_name, :tab_type], @bud_instance)
@table_schema = new_tab(:t_table_schema, [:tab_name, :col_name, :ord], @bud_instance)

Expand Down Expand Up @@ -42,7 +42,7 @@ def initialize(bud_instance)
end

def new_tab(name, schema, instance)
ret = Bud::BudTcTable.new(name, instance, schema)
ret = Bud::BudDbmTable.new(name, instance, schema)
instance.tables[name] = ret
return ret
end
Expand Down

0 comments on commit 56adcb1

Please sign in to comment.