public
Description: Yet Another Planet Refactoring
Homepage: http://intertwingly.net/blog/2007/12/19/Yet-Another-Planet-Refactoring
Clone URL: git://github.com/rubys/mars.git
Search Repo:
Sam Ruby (author)
Thu Apr 03 17:55:30 -0700 2008
commit  594cd30192668c6310b3236b978d5d4a6d706fb7
tree    fb2c98375a3f69e66dc22da049a8c7e26438d717
parent  775bc2a397c7812ae67b9979f288c3c835aab059
mars / planet / fido.rb
100644 185 lines (160 sloc) 5.777 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
require 'addressable/uri'
require 'digest/md5'
require 'net/https'
require 'thread'
require 'timeout'
require 'yaml'
require 'zlib'
 
require 'planet/log'
 
module Planet
 
  # map a URI to a readable and (relatively) unique filename
  def Planet.filename uri
    name = uri_norm(uri)
    name.sub!(/^\w+:\/*(\w+:|www\.)?/,'') # remove scheme and www.
    name.gsub! /[?\/:|]+/, ',' # replace separator characters
    name.sub! /^[,.]*/, '' # remove initial junk
    name.sub! /[,.]*$/, '' # remove final junk
 
    if name.length > 250
      parts, excess = name.split(','), []
      excess << parts.pop while parts.join(',').length > 220
      parts << Digest::MD5.hexdigest(excess.join(','))
      name = parts.join(',')
    end
 
    name
  end
 
  class Fido
    attr_accessor :cache, :redirect_limit, :threads, :timeout
 
    def initialize cache
      @cache = cache
      @timeout = 30
      @threads = 6
      @redirect_limit = 10
    end
 
    # invoke fetch on a list of uris in parallel
    def each(uris)
      lock = Mutex.new
      queue = uris.clone
 
      threads = []
      @threads.times do |i|
        threads[i] = Thread.new {
          while uri = lock.synchronize {queue.pop}
            begin
              response = fetch(Planet::uri_norm(uri), redirect_limit)
              yield uri, response
              write_to_cache uri, response
            rescue Exception => e
              Planet.log.error e.inspect
              Planet.log.error uri
              e.backtrace.each {|line| Planet.log.error line}
            end
          end
        }
      end
 
      # wait for each to complete
      threads.each {|thread| thread.join}
    end
 
    # fetch a uri, processing up to redirect_limit number of redirects
    def fetch uri, redirect_limit=10
      cachefile = File.join(@cache, Planet.filename(uri))
 
      # handle permanent redirects and gone
      if File.exist? cachefile
        cache = File.open(cachefile) {|file| YAML::load file.read}
        return cache if cache.code == '410'
        if cache.code == '301' and redirect_limit > 0
          location = cache['location']
          if location
            return fetch(Planet::uri_norm(uri,location), redirect_limit-1)
          end
        end
      else
        cache = {}
      end
 
      # issue the request, handling timeout, ssl, etc.
      response = begin
        uri = URI.parse(uri)
        Timeout::timeout(@timeout) {
          http = Net::HTTP::new(uri.host, uri.port)
 
          if uri.scheme == 'https'
            http.use_ssl = true
            http.verify_mode = OpenSSL::SSL::VERIFY_NONE
          end
 
          http.start {
            request = Net::HTTP::Get.new(uri.request_uri)
            request['If-None-Match'] ||= cache['Etag']
            request['If-Modified-Since'] ||= cache['Last-Modified']
            request['USER-Agent'] = 'Mars'
            request['Accept-Encoding'] = 'gzip, deflate'
            http.request(request)
          }
        }
      rescue Timeout::Error => error
        Net::HTTPRequestTimeOut.new '1.1', '408', error.to_s
      rescue SocketError, Errno::ECONNRESET => error
        Net::HTTPInternalServerError.new '1.1', '500', error.to_s
      end
 
      # expand gzip and deflated responses
      if response.code == '200' and response.body
        case response['content-encoding']
        when 'gzip', 'x-gzip'
          gz = Zlib::GzipReader.new(StringIO.new(response.body))
          response.instance_eval {@body = gz.read}
          gz.close
          response.delete('content-encoding')
        when 'deflate'
          response.instance_eval {@body = Zlib::Inflate.inflate(response.body)}
          response.delete('content-encoding')
        end
      end
 
      # not all servers handle conditional gets, so while not much can be
      # done about the bandwidth, but if the response body is identical
      # the downstream processing (parsing, caching, ...) can be avoided.
      if response.code == '200' and cache.respond_to? :body
        if response.body == cache.body
          response = Net::HTTPNotModified.new('1.0', '304', 'Not Modified')
        end
      end
 
      # handle redirects
      if %w[301 302 307].include? response.code and redirect_limit > 0
        location = response['location']
        if location
          return fetch(Planet::uri_norm(uri.to_s,location), redirect_limit - 1)
        end
      end
 
      # log the response and save the actual content location used
      level = (response.code<'400' ? :info : :warn)
      Planet.log.send level, "#{response.code} #{uri}"
      response.header['Content-Location'] ||= uri.to_s
 
      response
    rescue Timeout::Error
      raise
    rescue Exception => e
      response = Net::HTTPInternalServerError.new('1.0', '500', e.to_s)
      response.header['Content-Location'] ||= uri.to_s
 
      Planet.log.error "#{response.code} #{uri}"
      Planet.log.error e.inspect
      e.backtrace.each {|line| Planet.log.error line}
 
      response
    end
 
    # update cache with successful and permanent responses
    def write_to_cache uri, response
      if %w[200 301 410].include? response.code
        cachefile = File.join(@cache, Planet.filename(uri))
        File.open(cachefile,'w') {|file| file.write(response.to_yaml)}
      end
    end
 
    # fetch previous successful response from cache
    def read_from_cache uri
      cachefile = File.join(@cache, Planet.filename(uri))
      File.open(cachefile) {|file| YAML::load file.read}
    end
  end
 
  # convenience method to normalize a URI
  def Planet.uri_norm *parts
    begin
      Addressable::URI.join(*parts).normalize.to_s
    rescue Exception => e
      Planet.log.warn "#{e} #{parts.inspect}"
      parts.last
    end
  end
end