diff --git a/tumblr_machine.rb b/tumblr_machine.rb index 96e6810..9d854ed 100644 --- a/tumblr_machine.rb +++ b/tumblr_machine.rb @@ -96,14 +96,27 @@ def json(code, message) get '/' do check_logged + + skippable_tumblr_ids = + Tumblr. + select(:id). + where('tumblrs.last_reblogged_post is not null and tumblrs.last_reblogged_post > ?', (DateTime.now << 1)) + @total_posts = Post.count + @number_waiting_posts = Post. where('skip is not ?', true). where('posted = ?', false). where('tumblr_id not in (?)', skippable_tumblr_ids). where('score >= ?', MIN_SCORE). count - @posts = next_posts().limit(500).to_a + + @posts = Post. + where('skip is not ?', true). + where(:posted => false). + where('tumblr_id not in (?)', skippable_tumblr_ids). + order(Sequel.desc(:score), Sequel.desc(:fetched)). + limit(500).to_a tags_with_score = tags_with_score_in_hash @@ -328,14 +341,12 @@ def fetch_tags(tags_names) tags_with_score = tags_with_score_in_hash - fingerprints = {} - semaphore = Mutex.new found_posts.each do |found_post| begin if (post = create_post(found_post, tags_with_score)) posts_count += 1 if post.img_url && (post.score >= MIN_SCORE) && (!post.img_saved) - hydra.queue(create_storage_request(post, fingerprints, semaphore)) + hydra.queue(create_storage_request(post)) end end rescue Exception => e @@ -344,22 +355,6 @@ def fetch_tags(tags_names) end hydra.run - fingerprints.each_pair do |post_id, fingerprint| - post = Post.where(:id => post_id).first - if fingerprint - post.update({:img_saved => true, :fingerprint => fingerprint}) - unless Post. - where('fingerprint is not null'). - where('id != ?', post_id). - where('hamming(fingerprint, (select fingerprint from posts where id = ?)) >= ?', post_id, DUPLICATE_LEVEL). - empty? - post.update({:skip => true}) - end - else - post.update({:img_saved => true}) - end - end - Tag.where(:name => tags_names).update(:last_fetch => DateTime.now) posts_count end @@ -367,10 +362,8 @@ def fetch_tags(tags_names) # Create the request to store an image # @params post [Post] the post we do the stuff for - # @param fingerprints [Hash] hash to add fingerprint - # @param semaphore [Mutex] a mutex to synchronize # @return []Typhoeus::Request} the Request to add - def create_storage_request(post, fingerprints, semaphore) + def create_storage_request(post) request = Typhoeus::Request.new post.img_url request.on_complete do |response| if response.code == 200 @@ -380,12 +373,22 @@ def create_storage_request(post, fingerprints, semaphore) file.write response.body end - semaphore.synchronize do - if File.exist? dest_file - post_fingerprint = Phashion::Image.new(dest_file).fingerprint - fingerprints[post.id] = Sequel.lit("B'#{post_fingerprint.to_s(2).rjust(64, '0')}'") - else - fingerprints[post.id] = nil + if File.exist? dest_file + post_fingerprint = Phashion::Image.new(dest_file).fingerprint + sql_fingerprint = Sequel.lit("B'#{post_fingerprint.to_s(2).rjust(64, '0')}'") + + DATABASE.transaction do + DATABASE[ + 'UPDATE posts SET img_saved = TRUE, fingerprint = ?, skip = exists(select * from posts p2 where p2.fingerprint is not null and p2.id != ? and hamming(p2.fingerprint, ?) >= ?) WHERE id = ?', + sql_fingerprint, + post.id, + sql_fingerprint, + DUPLICATE_LEVEL, + post.id].update + end + else + DATABASE.transaction do + post.update({:img_saved => true}) end end end @@ -405,7 +408,7 @@ def create_post(values, tags_with_score) tumblr_name = values[:tumblr_name] tumblr_url = values[:tumblr_url] DATABASE.transaction do - DATABASE['INSERT INTO tumblrs (name, url) SELECT ?, ? WHERE NOT EXISTS (SELECT id FROM tumblrs WHERE url = ?)', tumblr_name, tumblr_url, tumblr_url].all + DATABASE['INSERT INTO tumblrs (name, url) SELECT ?, ? WHERE NOT EXISTS (SELECT id FROM tumblrs WHERE url = ?)', tumblr_name, tumblr_url, tumblr_url].insert end tumblr = Tumblr.first(:url => values[:tumblr_url]) if tumblr.name != tumblr_name @@ -433,7 +436,7 @@ def create_post(values, tags_with_score) post_insert_query = "INSERT INTO posts (#{posts_insert_params.collect { |param| param[0] }.join(', ')}) SELECT #{Array.new(posts_insert_params.length, '?').join(', ')} WHERE NOT EXISTS (SELECT id FROM posts WHERE tumblr_post_id = ?)" DATABASE.transaction do - DATABASE[post_insert_query, *posts_insert_params.collect { |param| param[1] }, values[:id]].all + DATABASE[post_insert_query, *posts_insert_params.collect { |param| param[1] }, values[:id]].insert end Post.first(:tumblr_post_id => values[:id]) end @@ -451,22 +454,6 @@ def reblog(post) update(:last_reblogged_post => DateTime.now) end - # Finder for the next posts - def next_posts - Post. - where('skip is not ?', true). - where(:posted => false). - where('tumblr_id not in (?)', skippable_tumblr_ids). - order(Sequel.desc(:score), Sequel.desc(:fetched)) - end - - def skippable_tumblr_ids - @skippable_tumblr_ids ||= - Tumblr. - select(:id). - where('tumblrs.last_reblogged_post is not null and tumblrs.last_reblogged_post > ?', (DateTime.now << 1)) - end - def tags_with_score_in_hash Tag.where('value != ?', 0).to_hash(:name, :value) end