Skip to content

Commit

Permalink
added link_worker.rb
Browse files Browse the repository at this point in the history
  • Loading branch information
c2h2 committed May 7, 2012
1 parent 2acb37d commit 3ed549c
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 7 deletions.
6 changes: 4 additions & 2 deletions Gemfile
Expand Up @@ -2,9 +2,11 @@ source 'https://rubygems.org'

gem 'rails', '3.2.1'

gem 'mongoid', "~> 2.4"
gem 'bson_ext', "~> 1.5"
gem 'mongoid'
gem 'bson_ext'
gem 'nokogiri'
gem 'bunny'


# Bundle edge Rails instead:
# gem 'rails', :git => 'git://github.com/rails/rails.git'
Expand Down
6 changes: 4 additions & 2 deletions Gemfile.lock
Expand Up @@ -34,6 +34,7 @@ GEM
bson_ext (1.5.2)
bson (= 1.5.2)
builder (3.0.0)
bunny (0.7.9)
coffee-rails (3.2.2)
coffee-script (>= 2.2.0)
railties (~> 3.2.0)
Expand Down Expand Up @@ -126,10 +127,11 @@ PLATFORMS
ruby

DEPENDENCIES
bson_ext (~> 1.5)
bson_ext
bunny
coffee-rails (~> 3.2.1)
jquery-rails
mongoid (~> 2.4)
mongoid
nokogiri
rails (= 3.2.1)
ruby-debug19
Expand Down
2 changes: 1 addition & 1 deletion app/models/link.rb
Expand Up @@ -8,6 +8,6 @@ class Link
field :depth, :type => Integer


embeds_many :pages
has_many :pages

end
2 changes: 1 addition & 1 deletion app/models/page.rb
Expand Up @@ -16,7 +16,7 @@ class Page
field :exp_at, :type => DateTime


embedded_in :link
belongs_to :link

def dl url, ua=nil, cookie=nil, params={}
remain_times = params[:remain]
Expand Down
63 changes: 63 additions & 0 deletions lib/medusa/aux.rb
@@ -0,0 +1,63 @@
require 'logger'

LOG = Logger.new(STDOUT)

### AUX classes ###
class Stopwatch
def initialize
start
end

def start
@t0 = Time.now
end

def end
@t1 = Time.now
@t1 - @t0
end

def self.ts
Time.now.to_i.to_s
end

def self.ts2
t=Time.new.to_i.to_s
[t.slice(0..6), t.slice(7..-1)]
end

end

class Util
def self.hexsha512 str
Digest::SHA512::hexdigest str
end

def self.sha512 str
Digest::SHA512::digest str
end

def self.hexsha384 str
Digest::SHA384::hexdigest str
end

def self.sha384 str
Digest::SHA384::digest str
end

def self.log str, error_level=1
@@counter ||=0
str="" if str.nil?
str = Time.now.to_s + "|" + (@@counter+=1).to_s+ "|" + str
if error_level > 1
LOG.warn str
else
LOG.info str
end
end

def self.exit
self.log "exiting..."
exit 0
end
end
12 changes: 12 additions & 0 deletions lib/medusa/fill_dummy.rb
@@ -0,0 +1,12 @@
require_relative 'db.rb'
require_relative 'models.rb'
require_relative 'conf.rb'


1000.times do |i|
l=Link.new
l.url = (0...8).map{65.+(rand(25)).chr}.join
l.state = 0
l.save
puts "#{i} saved!"
end
2 changes: 1 addition & 1 deletion lib/medusa/link_dispatcher.rb
Expand Up @@ -39,7 +39,7 @@ def feed_rabbit
@new_links.each do |link|
@cnt = @cnt + 1
ylink = link.to_yaml
@exch.publish(ylink)
@exch.publish(ylink, :key=>"links")
@logger.log @cnt
STDOUT.puts @cnt
end
Expand Down
111 changes: 111 additions & 0 deletions lib/medusa/link_worker.rb
@@ -0,0 +1,111 @@
require_relative 'db.rb'
require_relative 'models.rb'
require_relative 'conf.rb'
require_relative 'aux.rb'
require 'bunny'
require 'yaml'
require 'logger'


class Linkworker

def initialize host
@cnt = 0
@bunny = Bunny.new(:logging => false, :host=>host )
@bunny.start
@exch = @bunny.exchange("links")
@queue = @bunny.queue("links")
@queue.bind(@exch, :key=>"links")

# @bunny2 = Bunny.new(:host=>host)
# @exch2 = @bunny.exchange("pages")
# @queue2 = @bunny.queue("pages")
# @queue2 = @queue2.bind(@exch2, :key=>"pages")
end

def run
loop do
process_one_link
end
end

def report page


end

def process_one_link
link = get_a_job
#if link is accquired successful
unless link.nil?
page = dl link
#if page is dl'ed successful
report page
else
Util.log "no more job, sleep for a while"
sleep 5
end
end

def dl link, remain_times = 3
page = Page.new
if remain_times <= 0
Util.log "Error in DL #{link.url} really failed after #{3} times"
return
end
page.link = link
Util.log "DL #{link.url}"
sw=Stopwatch.new
begin
Timeout::timeout(Conf.time(:network)) do
open(url, hash) do |f|
@doc = f.read
page.charset = f.charset
page.mime = f.content_type
page.code = f.status[0].to_i
f.base_uri
f.meta
begin
page.expires_at = Time.parse(f.meta["expires"])
rescue => e
#no expires found
end

begin
page.etag = f.meta["etag"]
rescue => e
#no etag found
end

unless f.last_modified.nil?
page.lm_at = f.last_modified
end
end
page.response_time = (sw.end * 1000).floor
end
rescue => e
dl(link, remain_times - 1)
Util.log "Error in dl|#{link.url}|RETRYING#{remain_times - 1}|#{e}"
end
page
end

def get_a_job
item = @queue.pop
puts item.class
puts item[:payload].class
if item[:payload].is_a? String
link = YAML::load item[:payload]
else
link = nil
end
link
end



end

lw=Linkworker.new "localhost"
lw.run

0 comments on commit 3ed549c

Please sign in to comment.