Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Use adapters for the link and page queues #37

Open
wants to merge 12 commits into from

3 participants

@EvilScott

For now just the default (Ruby Queue class) and Redis. All tests pass.

lib/anemone/queue/redis.rb
((7 lines not shown))
+
+module Anemone
+ module Queue
+ class Redis
+
+ def initialize(queue_type, opts = {})
+ if [:link, :page].include? !queue_type
+ raise 'You must specify a queue type (:link or :page)'
+ end
+ @redis = ::Redis.new(opts)
+ @prefix = "#{opts[:key_prefix] || 'anemone'}:#{queue_type}"
+ clear
+ end
+
+ def <<(job)
+ id = @redis.incr("#{@prefix}:counter")

Use Redis List instead of multiple keys http://redis.io/commands#list

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue/redis.rb
@@ -0,0 +1,69 @@
+begin
+ require 'redis'
+rescue LoadError
+ puts "You need the redis-client gem to use Anemone::Queue::Redis"
+ exit
+end
+
+module Anemone
+ module Queue
+ class Redis
+
+ def initialize(queue_type, opts = {})

Use self.hash (get rid of negatives?) instead of passing in queue_type for uniq key

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue/redis.rb
((11 lines not shown))
+
+ def initialize(queue_type, opts = {})
+ if [:link, :page].include? !queue_type
+ raise 'You must specify a queue type (:link or :page)'
+ end
+ @redis = ::Redis.new(opts)
+ @prefix = "#{opts[:key_prefix] || 'anemone'}:#{queue_type}"
+ clear
+ end
+
+ def <<(job)
+ id = @redis.incr("#{@prefix}:counter")
+ job.each { |k,v| @redis.hset("#{@prefix}:#{id}", k, v) }
+ end
+
+ def deq

Keep a key of number of waiting threads. incr at the beginning of this function, decr at the end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue/redis.rb
((23 lines not shown))
+ job.each { |k,v| @redis.hset("#{@prefix}:#{id}", k, v) }
+ end
+
+ def deq
+ key = keys.last
+ val = rget(key)
+ @redis.del(key)
+ val
+ end
+
+ def empty?
+ keys.count == 0
+ end
+
+ def size
+ keys.count

Use LLEN

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue/redis.rb
((19 lines not shown))
+ end
+
+ def <<(job)
+ id = @redis.incr("#{@prefix}:counter")
+ job.each { |k,v| @redis.hset("#{@prefix}:#{id}", k, v) }
+ end
+
+ def deq
+ key = keys.last
+ val = rget(key)
+ @redis.del(key)
+ val
+ end
+
+ def empty?
+ keys.count == 0

use size == 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue/redis.rb
((27 lines not shown))
+ key = keys.last
+ val = rget(key)
+ @redis.del(key)
+ val
+ end
+
+ def empty?
+ keys.count == 0
+ end
+
+ def size
+ keys.count
+ end
+
+ def num_waiting
+ keys.count

Return key of num waiting threads to_i

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue.rb
@@ -0,0 +1,15 @@
+module Anemone
+ module Queue
+
+ def self.Default(*args)

Rename to Basic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/core.rb
@@ -151,8 +161,17 @@ module Anemone
@urls.delete_if { |url| !visit_link?(url) }
return if @urls.empty?
- link_queue = Queue.new
- page_queue = Queue.new
+ # create link queue
+ if Queue.methods.include? !@opts[:link_queue_adapter]
+ raise "Unknown link queue adapter #{@opts[:link_queue_adapter]}"
+ end
+ link_queue = Queue.send(@opts[:link_queue_adapter], :link, @opts[:link_queue_options])

link_queue = @opts[:link_queue] || Anemone::Queue.Default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/core.rb
@@ -55,7 +57,15 @@ module Anemone
# proxy server port number
:proxy_port => false,
# HTTP read timeout in seconds
- :read_timeout => nil
+ :read_timeout => nil,
+ # use default queue adapter for link queue
+ :link_queue_adapter => :Default,
+ # link queue options sub-hash
+ :link_queue_options => {},

No need for options here, as they are passed into Queue.Redis

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Scott Reis added some commits
Scott Reis use redis list instead of multiple keys for queue, rename Default to …
…Basic, update tests, and remove default adapter options (since they will be passed into the adapters explicitly
8fd91ab
Scott Reis refactor num_waiting to not use brpop since it blocks the socket 6d97652
lib/anemone/queue/redis.rb
((6 lines not shown))
+end
+
+module Anemone
+ module Queue
+ class Redis
+
+ def initialize(opts = {})
+ @redis = ::Redis.new(opts)
+ @list = "#{opts[:key_prefix] || 'anemone'}:#{self.hash.abs}"
+ @waiting = "#{@list}:waiting"
+ @timeout = opts[:timeout] || 10
+ clear
+ end
+
+ def <<(job)
+ @redis.lpush(@list,job.to_json)

Lazy load Thread local redis connection

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue/redis.rb
((10 lines not shown))
+ class Redis
+
+ def initialize(opts = {})
+ @redis = ::Redis.new(opts)
+ @list = "#{opts[:key_prefix] || 'anemone'}:#{self.hash.abs}"
+ @waiting = "#{@list}:waiting"
+ @timeout = opts[:timeout] || 10
+ clear
+ end
+
+ def <<(job)
+ @redis.lpush(@list,job.to_json)
+ end
+
+ def deq
+ json = @redis.rpop(@list)

Lazy load redis here too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue/redis.rb
((36 lines not shown))
+ def empty?
+ size == 0
+ end
+
+ def size
+ @redis.llen(@list)
+ end
+
+ def num_waiting
+ @redis.get(@waiting).to_i
+ end
+
+ def clear
+ @redis.del(@list, @waiting)
+ end
+

def redis
Thread[:redis] ||= ::Redis.new(@opts)
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
lib/anemone/queue/redis.rb
@@ -0,0 +1,54 @@
+begin
+ require 'redis'
+rescue LoadError
+ puts "You need the redis-client gem to use Anemone::Queue::Redis"
+ exit
+end
+
+module Anemone
+ module Queue
+ class Redis
+
+ def initialize(opts = {})
+ @redis = ::Redis.new(opts)

Store opts in instance var

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@bernd

I've been using this since November. Works fine for me.

@leehambley

When I understand this correctly, this is to switch the in-memory queue to Redis and other memory control patches?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Oct 21, 2011
  1. update gitignore to ignore files creates by intellij

    Scott Reis authored
  2. update code and tests so the queue spec runs at all

    Scott Reis authored
  3. fill in redis queue adapter and tests

    Scott Reis authored
Commits on Oct 24, 2011
  1. slightly change how queue type and options are passed, update tests a…

    Scott Reis authored
    …ccordingly, and integrate the queue adpaters into the core of anemone
Commits on Oct 25, 2011
  1. use redis list instead of multiple keys for queue, rename Default to …

    Scott Reis authored
    …Basic, update tests, and remove default adapter options (since they will be passed into the adapters explicitly
Commits on Oct 28, 2011
  1. use thread local variable for redis connection so we can block and av…

    Scott Reis authored
    …oid polling
This page is out of date. Refresh to see the latest.
View
3  .gitignore
@@ -2,3 +2,6 @@
Gemfile.lock
test.db
test.tch
+.bundle/*
+.idea/*
+*.iml
View
1  Gemfile
@@ -2,6 +2,7 @@ source :rubygems
gem 'nokogiri'
gem 'robots'
+gem 'json'
group :development do
gem 'fakeweb'
View
12 lib/anemone/core.rb
@@ -1,15 +1,18 @@
require 'thread'
require 'robots'
+require 'json'
require 'anemone/tentacle'
require 'anemone/page'
require 'anemone/exceptions'
require 'anemone/page_store'
require 'anemone/storage'
require 'anemone/storage/base'
+require 'anemone/queue'
+require 'anemone/queue/base'
module Anemone
- VERSION = '0.6.1';
+ VERSION = '0.6.1'
#
# Convenience method to start a crawl
@@ -151,8 +154,11 @@ def run
@urls.delete_if { |url| !visit_link?(url) }
return if @urls.empty?
- link_queue = Queue.new
- page_queue = Queue.new
+ # create link queue
+ link_queue = @opts[:link_queue] || Anemone::Queue.Basic
+
+ # create page queue
+ page_queue = @opts[:page_queue] || Anemone::Queue.Basic
@opts[:threads].times do
@tentacles << Thread.new { Tentacle.new(link_queue, page_queue, @opts).run }
View
2  lib/anemone/page_store.rb
@@ -66,7 +66,7 @@ def shortest_paths!(root)
root = URI(root) if root.is_a?(String)
raise "Root node not found" if !has_key?(root)
- q = Queue.new
+ q = ::Queue.new
q.enq root
root_page = self[root]
View
15 lib/anemone/queue.rb
@@ -0,0 +1,15 @@
+module Anemone
+ module Queue
+
+ def self.Basic(*args)
+ require 'anemone/queue/basic'
+ self::Basic.new(*args)
+ end
+
+ def self.Redis(*args)
+ require 'anemone/queue/redis'
+ self::Redis.new(*args)
+ end
+
+ end
+end
View
44 lib/anemone/queue/base.rb
@@ -0,0 +1,44 @@
+require 'anemone/queue/exceptions'
+
+module Anemone
+ module Queue
+ class Base
+
+ def initialize(adapter)
+ @adap = adapter
+
+ # verify adapter conforms to this class's methods
+ methods.each do |method|
+ if !@adap.respond_to?(method.to_sym)
+ raise "Queue adapter must support method #{method}"
+ end
+ end
+ end
+
+ def <<(job)
+ @adap << job rescue raise InsertionError, $!
+ end
+
+ def deq
+ @adap.deq rescue raise RetrievalError, $!
+ end
+
+ def empty?
+ @adap.empty rescue raise GenericError, $!
+ end
+
+ def size
+ @adap.size rescue raise GenericError, $!
+ end
+
+ def num_waiting
+ @adap.num_waiting rescue raise GenericError, $!
+ end
+
+ def clear
+ @adap.clear rescue raise GenericError, $!
+ end
+
+ end
+ end
+end
View
11 lib/anemone/queue/basic.rb
@@ -0,0 +1,11 @@
+module Anemone
+ module Queue
+ class Basic < DelegateClass ::Queue
+
+ def initialize
+ super ::Queue.new
+ end
+
+ end
+ end
+end
View
15 lib/anemone/queue/exceptions.rb
@@ -0,0 +1,15 @@
+module Anemone
+ module Queue
+
+ class GenericError < Error; end
+
+ class ConnectionError < Error; end
+
+ class RetrievalError < Error; end
+
+ class InsertionError < Error; end
+
+ class CloseError < Error; end
+
+ end
+end
View
54 lib/anemone/queue/redis.rb
@@ -0,0 +1,54 @@
+begin
+ require 'redis'
+rescue LoadError
+ puts "You need the redis-client gem to use Anemone::Queue::Redis"
+ exit
+end
+
+module Anemone
+ module Queue
+ class Redis
+
+ def initialize(opts = {})
+ @opts = opts
+ @list = "#{@opts[:key_prefix] || 'anemone'}:#{self.hash.abs}"
+ @waiting = "#{@list}:waiting"
+ clear
+ end
+
+ def <<(job)
+ redis.lpush(@list,job.to_json)
+ end
+
+ def deq
+ redis.incr(@waiting)
+ job = redis.brpop(@list, @opts[:timeout] || 0)
+ redis.decr(@waiting)
+ JSON.parse(job.last) rescue nil
+ end
+
+ def empty?
+ size == 0
+ end
+
+ def size
+ redis.llen(@list)
+ end
+
+ def num_waiting
+ redis.get(@waiting).to_i
+ end
+
+ def clear
+ redis.del(@list, @waiting)
+ end
+
+ private
+
+ def redis
+ Thread.current[:redis] ||= ::Redis.new(@opts)
+ end
+
+ end
+ end
+end
View
97 spec/queue_spec.rb
@@ -0,0 +1,97 @@
+$:.unshift(File.dirname(__FILE__))
+require 'spec_helper'
+
+%w[basic redis].each { |file| require "anemone/queue/#{file}.rb" }
+
+module Anemone
+ describe Queue do
+
+ it "should have a class method to produce a default queue" do
+ Anemone::Queue.should respond_to(:Basic)
+ Anemone::Queue.Basic.should be_an_instance_of(Queue::Basic)
+ end
+
+ it "should have a class method to produce a Redis queue" do
+ Anemone::Queue.should respond_to(:Redis)
+ Anemone::Queue.Redis.should be_an_instance_of(Queue::Redis)
+ end
+
+ module Queue
+ shared_examples_for :an_adapter do
+
+ let(:test_data) { {'foo' => 'bar', 'herp' => 'derp'} }
+
+ it 'should implement << and deq' do
+ @queue.should respond_to(:<<)
+ @queue.should respond_to(:deq)
+ @queue << test_data
+ @queue.size.should == 1
+ @queue.deq.should == test_data
+ @queue.size.should == 0
+ end
+
+ it 'should implement empty?' do
+ @queue.should respond_to(:empty?)
+
+ @queue.empty?.should be_true
+
+ @queue << test_data
+ @queue.empty?.should be_false
+
+ @queue.deq
+ @queue.empty?.should be_true
+ end
+
+ it 'should implement size' do
+ @queue.should respond_to(:size)
+
+ @queue.size.should == 0
+
+ @queue << test_data
+ @queue.size.should == 1
+ end
+
+ it 'should implement num_waiting' do
+ @queue.should respond_to(:num_waiting)
+
+ @queue.num_waiting.should == 0
+
+ 3.times { Thread.new { @queue.deq } }
+
+ [3,2,1,0].each do |n|
+ sleep(0.2)
+ @queue.num_waiting.should == n
+ @queue << test_data
+ end
+ end
+
+ it 'should implement clear' do
+ @queue.should respond_to(:clear)
+
+ @queue << test_data
+ @queue.clear
+ @queue.size.should == 0
+ end
+
+ end
+
+ describe Basic do
+ it_should_behave_like :an_adapter
+
+ before(:each) { @queue = Queue.Basic }
+ after(:all) { @queue = nil }
+
+ end
+
+ describe Redis do
+ it_should_behave_like :an_adapter
+
+ before(:all) { @queue = Queue.Redis }
+ after(:each) { @queue.clear }
+ after(:all) { @queue = nil }
+
+ end
+
+ end
+ end
+end
Something went wrong with that request. Please try again.