Skip to content

Commit

Permalink
Only use memory queues. Write queues to disk when receiving a TERM
Browse files Browse the repository at this point in the history
git-svn-id: http://sparrow.googlecode.com/svn/trunk@77 4e43a301-793d-0410-b222-f1b7b3a1f9c5
  • Loading branch information
maccman committed Aug 2, 2008
1 parent da12b89 commit 18338d0
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 134 deletions.
1 change: 0 additions & 1 deletion Manifest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ Rakefile
bin/sparrow
lib/sparrow.rb
lib/sparrow/queue.rb
lib/sparrow/queues/disk.rb
lib/sparrow/queues/memory.rb
lib/sparrow/queues/sqlite.rb
lib/sparrow/runner.rb
Expand Down
2 changes: 2 additions & 0 deletions README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Sparrow
# The load Sparrow can cope with increases exponentially as you add to the cluster.
# Sparrow also takes advantage of eventmachine, which uses a non-blocking io, offering great performance.
#
# Sparrow is a in-memory queue but will persist the data to disk when receiving a term signal.
#
# Sparrow comes with built in support for daemonization and clustering.
# Also included are example libraries and clients. For example:
#
Expand Down
1 change: 1 addition & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Hoe.new('sparrow', Sparrow::VERSION) do |p|
p.url = 'http://code.google.com/p/sparrow'
p.changes = p.paragraphs_of('History.txt', 0..1).join("\n\n")
p.extra_deps << ['eventmachine', '>=0.10.0']
p.extra_deps << ['sqlite3-ruby', '>=1.2.2']
end

# vim: syntax=Ruby
1 change: 0 additions & 1 deletion lib/sparrow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,5 @@ def pid_dir
require 'sparrow/server'
require 'sparrow/queues/sqlite' rescue LoadError nil
require 'sparrow/queues/memory'
require 'sparrow/queues/disk'
require 'sparrow/queue'
require 'sparrow/runner'
17 changes: 7 additions & 10 deletions lib/sparrow/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@ class Queue

class << self
def get_queue(queue_name)
@@queues[queue_name] ||= case Sparrow.options[:type]
when 'memory': Sparrow::Queues::Memory.new(queue_name)
when 'sqlite': Sparrow::Queues::Sqlite.new(queue_name)
else
Sparrow::Queues::Disk.new(queue_name)
end
@@queues[queue_name] ||= Sparrow::Queues::Memory.new(queue_name)
end

def next_message(queue_name)
Expand All @@ -31,14 +26,17 @@ def delete(queue_name)
end

def delete_all
@@queues.each {|name, q| q.clear! }
@@queues = {}
FileUtils.rm_rf base_dir
FileUtils.mkdir_p base_dir
end

def shutdown!
@@queues.each {|name, q| q.shutdown! }
end

def get_stats(queue_name)
stats = {
:type => Sparrow.options[:type],
:type => 'memory',
:total_bytes => (File.size?(Sparrow.base_dir) || 0),
:queues => Dir.glob(File.join(Sparrow.base_dir, '*')).collect {|s| File.basename(s) }.join(','),
:number_of_queues => queues.keys.length,
Expand All @@ -53,7 +51,6 @@ def get_stats(queue_name)
if queue_name
queue = get_queue(queue_name)
stats.merge!({
:bytes => Dir.glob(File.join(Sparrow.base_dir, queue_name + '**')).inject(0){|a, b| a += (File.size?(b) || 0); a },
:total_items => queue.count_push,
:curr_items => queue.count
})
Expand Down
109 changes: 0 additions & 109 deletions lib/sparrow/queues/disk.rb

This file was deleted.

33 changes: 29 additions & 4 deletions lib/sparrow/queues/memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,51 @@ def initialize(queue_name)
self.queue_data = []
self.count_pop = 0
self.count_push = 0
recover!
end

def pop
self.count_pop += 1
queue_data.shift
self.queue_data.shift
end

def push(value)
self.count_push += 1
queue_data.push(value)
self.queue_data.push(value)
end

def clear
def clear!
self.queue_data = []
self.sqlite.clear!
end

def count
queue_data.length
end


def to_disk!
copy = self.queue_data.dup
copy.each do |value|
self.sqlite.push(value)
end
self.queue_data = self.queue_data - copy
end

def shutdown!
self.to_disk!
end

def recover!
logger.debug "Recovering queue"
while msg = self.sqlite.pop
self.push(msg)
end
end

def sqlite
@sqlite ||= Sparrow::Queues::Sqlite.new(self.queue_name)
end

end
end
end
5 changes: 3 additions & 2 deletions lib/sparrow/queues/sqlite.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ def push(value)

def pop
id, value = db.get_first_row("SELECT * FROM queues LIMIT 1;")
return unless id and value
db.execute("DELETE FROM queues WHERE id = ?", id)
self.count_pop += 1
value
end

def count
db.get_first_value("SELECT COUNT FROM queues")
db.get_first_value("SELECT count(*) FROM queues").to_i
end

def clear
def clear!
db.execute("DELETE FROM queues")
end

Expand Down
13 changes: 6 additions & 7 deletions lib/sparrow/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def initialize

self.options.merge!({
:base_dir => base_dir,
:pid_dir => pid_dir,
:pid_dir => pid_dir,
:log_path => log_path
})

Expand Down Expand Up @@ -64,6 +64,9 @@ def start
end

def stop
puts
puts "Writing queues to disk..."
Sparrow::Queue.shutdown!
puts "Stopping Eventmachine Server"
EventMachine::stop
end
Expand All @@ -72,7 +75,7 @@ def parse_options
OptionParser.new do |opts|
opts.summary_width = 25
opts.banner = "Sparrow (#{VERSION})\n\n",
"Usage: sparrow [-b path] [-t type] [-h host] [-p port] [-P file]\n",
"Usage: sparrow [-b path] [-h host] [-p port] [-P file]\n",
" [-d] [-k port] [-l file] [-e]\n",
" sparrow --help\n",
" sparrow --version\n"
Expand All @@ -84,10 +87,6 @@ def parse_options
options[:base_dir] = File.expand_path(v)
end

opts.on("-t", "--type QUEUE_TYPE", String, "Type of queue (disk/memory/sqlite).", "(default: #{options[:type]})") do |v|
options[:type] = v
end

opts.separator ""; opts.separator "Network:"

opts.on("-h", "--host HOST", String, "Specify host", "(default: #{options[:host]})") do |v|
Expand Down Expand Up @@ -154,7 +153,7 @@ def kill_pid(k)
puts f
pid = IO.read(f).chomp.to_i
FileUtils.rm f
Process.kill(9, pid)
Process.kill(15, pid) # TERM
puts "killed PID: #{pid}"
rescue => e
puts "Failed to kill! #{k}: #{e}"
Expand Down

0 comments on commit 18338d0

Please sign in to comment.