joshu / mars forked from rubys/mars

Yet Another Planet Refactoring

This URL has Read+Write access

joshu (author)
Sun Apr 20 14:37:57 -0700 2008
commit  f09cbe485c4ded878a85ae29e47775468404c307
tree    259d311ce05f595c0ed8dcf711c0820559c769e0
parent  127bdf5d62b9f25fa140dc9b73294e99fbe87d71
mars / planet / fido.rb
100644 186 lines (160 sloc) 5.777 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
require 'addressable/uri'
require 'digest/md5'
require 'net/https'
require 'thread'
require 'timeout'
require 'yaml'
require 'zlib'
 
require 'planet/log'
 
module Planet
 
  # map a URI to a readable and (relatively) unique filename
  def Planet.filename uri
    name = uri_norm(uri)
    name.sub!(/^\w+:\/*(\w+:|www\.)?/,'') # remove scheme and www.
    name.gsub! /[?\/:|]+/, ',' # replace separator characters
    name.sub! /^[,.]*/, '' # remove initial junk
    name.sub! /[,.]*$/, '' # remove final junk
 
    if name.length > 250
      parts, excess = name.split(','), []
      excess << parts.pop while parts.join(',').length > 220
      parts << Digest::MD5.hexdigest(excess.join(','))
      name = parts.join(',')
    end
 
    name
  end
 
  class Fido
    attr_accessor :cache, :redirect_limit, :threads, :timeout
 
    def initialize cache
      @cache = cache
      @timeout = 30
      @threads = 6
      @redirect_limit = 10
    end
 
    # invoke fetch on a list of uris in parallel
    def each(uris)
      lock = Mutex.new
      queue = uris.clone
 
      threads = []
      @threads.times do |i|
        threads[i] = Thread.new {
          while uri = lock.synchronize {queue.pop}
            begin
              response = fetch(Planet::uri_norm(uri), redirect_limit)
              yield uri, response
              write_to_cache uri, response
            rescue Exception => e
              Planet.log.error e.inspect
              Planet.log.error uri
              e.backtrace.each {|line| Planet.log.error line}
            end
          end
        }
      end
 
      # wait for each to complete
      threads.each {|thread| thread.join}
    end
 
    # fetch a uri, processing up to redirect_limit number of redirects
    def fetch uri, redirect_limit=10
      cachefile = File.join(@cache, Planet.filename(uri))
 
      # handle permanent redirects and gone
      if File.exist? cachefile
        cache = File.open(cachefile) {|file| YAML::load file.read}
        return cache if cache.code == '410'
        if cache.code == '301' and redirect_limit > 0
          location = cache['location']
          if location
            return fetch(Planet::uri_norm(uri,location), redirect_limit-1)
          end
        end
      else
        cache = {}
      end
 
      # issue the request, handling timeout, ssl, etc.
      response = begin
        uri = URI.parse(uri)
        Timeout::timeout(@timeout) {
          http = Net::HTTP::new(uri.host, uri.port)
 
          if uri.scheme == 'https'
            http.use_ssl = true
            http.verify_mode = OpenSSL::SSL::VERIFY_NONE
          end
 
          http.start {
            request = Net::HTTP::Get.new(uri.request_uri)
            request['If-None-Match'] ||= cache['Etag']
            request['If-Modified-Since'] ||= cache['Last-Modified']
            request['USER-Agent'] = 'Mars'
            request['Accept-Encoding'] = 'gzip, deflate'
            http.request(request)
          }
        }
      rescue Timeout::Error => error
        Net::HTTPRequestTimeOut.new '1.1', '408', error.to_s
      rescue SocketError, Errno::ECONNRESET => error
        Net::HTTPInternalServerError.new '1.1', '500', error.to_s
      end
 
      # expand gzip and deflated responses
      if response.code == '200' and response.body
        case response['content-encoding']
        when 'gzip', 'x-gzip'
          gz = Zlib::GzipReader.new(StringIO.new(response.body))
          response.instance_eval {@body = gz.read}
          gz.close
          response.delete('content-encoding')
        when 'deflate'
          response.instance_eval {@body = Zlib::Inflate.inflate(response.body)}
          response.delete('content-encoding')
        end
      end
 
      # not all servers handle conditional gets, so while not much can be
      # done about the bandwidth, but if the response body is identical
      # the downstream processing (parsing, caching, ...) can be avoided.
      if response.code == '200' and cache.respond_to? :body
        if response.body == cache.body
          response = Net::HTTPNotModified.new('1.0', '304', 'Not Modified')
        end
      end
 
      # handle redirects
      if %w[301 302 307].include? response.code and redirect_limit > 0
        location = response['location']
        if location
          return fetch(Planet::uri_norm(uri.to_s,location), redirect_limit - 1)
        end
      end
 
      # log the response and save the actual content location used
      level = (response.code<'400' ? :info : :warn)
      Planet.log.send level, "#{response.code} #{uri}"
      response.header['Content-Location'] ||= uri.to_s
 
      response
    rescue Timeout::Error
      raise
    rescue Exception => e
      response = Net::HTTPInternalServerError.new('1.0', '500', e.to_s)
      response.header['Content-Location'] ||= uri.to_s
 
      Planet.log.error "#{response.code} #{uri}"
      Planet.log.error e.inspect
      e.backtrace.each {|line| Planet.log.error line}
 
      response
    end
 
    # update cache with successful and permanent responses
    def write_to_cache uri, response
      if %w[200 301 410].include? response.code
        cachefile = File.join(@cache, Planet.filename(uri))
        File.open(cachefile,'w') {|file| file.write(response.to_yaml)}
      end
    end
 
    # fetch previous successful response from cache
    def read_from_cache uri
      cachefile = File.join(@cache, Planet.filename(uri))
      File.open(cachefile) {|file| YAML::load file.read}
    end
  end
 
  # convenience method to normalize a URI
  def Planet.uri_norm *parts
    begin
      Addressable::URI.join(*parts).normalize.to_s
    rescue Exception => e
      Planet.log.warn "#{e} #{parts.inspect}"
      parts.last
    end
  end
end