astro / harvester

Web-based feed aggregator in Ruby

This URL has Read+Write access

Astro (author)
Wed Jul 15 18:54:47 -0700 2009
commit  081d7ec4a7a3a2de2141632987c05247af1affc3
tree    78e5e20985d338f9ff84735248dd4418a86dfc38
parent  fae28cff0cf3812aac245ef93f108d2f010f6cb4
harvester / fetch.rb
100755 196 lines (168 sloc) 6.03 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#!/usr/bin/env ruby
 
require 'dbi'
require 'yaml'
require 'uri'
require 'net/http'
begin
  require 'net/https'
rescue LoadError
  $stderr.puts "WARNING: No https support!"
end
begin
  require 'fastthread'
rescue LoadError
  require 'thread'
end
Thread::abort_on_exception = true
 
require 'mrss'
 
 
config = YAML::load File.new('config.yaml')
timeout = config['settings']['timeout'].to_i
sizelimit = config['settings']['size limit'].to_i
dbi = DBI::connect(config['db']['driver'], config['db']['user'], config['db']['password'])
 
# Hack, an explicit lock would look much better
class << dbi
  def transaction(*a)
    Thread::critical = true
    super
    Thread::critical = false
  end
end
 
#######################
# Database maintenance
#######################
 
puts "Looking for sources to purge..."
purge = []
dbi.select_all("SELECT collection, rss FROM sources") { |dbc,dbr|
  purge << [dbc, dbr] unless (config['collections'][dbc] || []).include? dbr
}
 
purge_rss = []
purge.each { |c,r|
  puts "Removing #{c}:#{r}..."
  dbi.do "DELETE FROM sources WHERE collection=? AND rss=?", c, r
  purge_rss << r
}
 
purge_rss.delete_if { |r|
  purge_this = true
 
  config['collections'].each { |cfc,cfr|
    if purge_this
      puts "Must keep #{r} because it's still in #{cfc}" if cfr.include? r
      purge_this = !(cfr.include? r)
    end
  }
 
  !purge_this
}
purge_rss.each { |r|
  puts "Purging items from feed #{r}"
  dbi.do "DELETE FROM items WHERE rss=?", r
}
 
###########
# Fetching
###########
 
maxurlsize = 0
config['collections'].each { |collection,rss_urls|
  rss_urls.each { |rss_url|
    maxurlsize = (rss_url.size > maxurlsize) ? rss_url.size : maxurlsize
  }
}
 
dbi['AutoCommit'] = false
last_get_started = Time.new
pending = []
pending_lock = Mutex.new
 
config['collections'].each { |collection,rss_urls|
  rss_urls.each { |rss_url|
    pending_lock.synchronize { pending << rss_url }
    Thread.new {
      db_rss, last = dbi.select_one "SELECT rss, last FROM sources WHERE collection=? AND rss=?", collection, rss_url
      is_new = db_rss.nil?
 
      uri = URI::parse rss_url
      p uri
      logprefix = "[#{uri.to_s.ljust maxurlsize}]"
 
      http = Net::HTTP.new uri.host, uri.port
      p http
      http.use_ssl = (uri.kind_of? URI::HTTPS) if defined? Net::HTTPS
      request = (if is_new or last.nil?
        puts "#{logprefix} GET"
        Net::HTTP::Get.new uri.request_uri
      else
        puts "#{logprefix} GET with If-Modified-Since: #{last}"
        Net::HTTP::Get.new uri.request_uri, {'If-Modified-Since'=>last}
      end)
      request.basic_auth(uri.user, uri.password) if uri.user
 
      last_get_started = Time.new
      begin
        response = http.request request
      rescue Exception => e
        puts "#{logprefix} #{e.class}: #{e}"
        pending_lock.synchronize { pending.delete rss_url }
Thread.current.kill
      end
      puts "#{logprefix} #{response.code} #{response.message}"
 
      if response.kind_of? Net::HTTPOK
        if response.body.size > sizelimit
          puts "#{logprefix} #{response.body.size} bytes big!"
        else
          begin dbi.transaction do
            rss = MRSS::parse response.body
 
            # Update source
            if is_new
              dbi.do "INSERT INTO sources (collection, rss, last, title, link, description) VALUES (?, ?, ?, ?, ?, ?)",
                collection, rss_url, response['Last-Modified'], rss.title, rss.link, rss.description
              puts "#{logprefix} Source added"
            else
              dbi.do "UPDATE sources SET last=?, title=?, link=?, description=? WHERE collection=? AND rss=?",
                response['Last-Modified'], rss.title, rss.link, rss.description, collection, rss_url
              puts "#{logprefix} Source updated"
            end
 
            items_new, items_updated = 0, 0
            rss.items.each { |item|
              description = item.description
 
              # Link mangling
              begin
                link = URI::join((rss.link.to_s == '') ? uri.to_s : rss.link.to_s, item.link || rss.link).to_s
              rescue URI::Error
                link = item.link
              end
 
              # Push into database
              db_title = dbi.select_one "SELECT title FROM items WHERE rss=? AND link=?", rss_url, link
              item_is_new = db_title.nil?
 
              if item_is_new
                begin
                  dbi.do "INSERT INTO items (rss, title, link, date, description) VALUES (?, ?, ?, ?, ?)",
                    rss_url, item.title, link, item.date, description
                  items_new += 1
                rescue DBI::ProgrammingError
                  puts description
                  puts "#{$!.class}: #{$!}\n#{$!.backtrace.join("\n")}"
                end
              else
                dbi.do "UPDATE items SET title=?, description=? WHERE rss=? AND link=?",
                  item.title, description, rss_url, link
                items_updated += 1
              end
 
              # Remove all enclosures
              dbi.do "DELETE FROM enclosures WHERE rss=? AND link=?", rss_url, link
              # Re-add all enclosures
              item.enclosures.each do |enclosure|
                href = URI::join((rss.link.to_s == '') ? link.to_s : rss.link.to_s, enclosure['href']).to_s
                dbi.do "INSERT INTO enclosures (rss, link, href, mime, title, length) VALUES (?, ?, ?, ?, ?, ?)",
                  rss_url, link, href, enclosure['type'], enclosure['title'], enclosure['length']
              end
            }
            puts "#{logprefix} New: #{items_new} Updated: #{items_updated}"
          end; rescue
            puts "#{logprefix} Error: #{$!.class}: #{$!}\n#{$!.backtrace.join("\n")}"
          end
        end
      end
 
      pending_lock.synchronize { pending.delete rss_url }
    }
  }
}
 
while Time.new < last_get_started + timeout and pending.size > 0
  sleep 1
end
pending_lock.synchronize {
  pending.each { |rss_url|
    puts "[#{rss_url.ljust maxurlsize}] Timed out"
  }
}