Skip to content
This repository has been archived by the owner on May 5, 2019. It is now read-only.

Commit

Permalink
cleaner code
Browse files Browse the repository at this point in the history
  • Loading branch information
archiloque committed Jan 26, 2016
1 parent ece9a95 commit c916111
Showing 1 changed file with 34 additions and 47 deletions.
81 changes: 34 additions & 47 deletions tumblr_machine.rb
Expand Up @@ -96,14 +96,27 @@ def json(code, message)

get '/' do
check_logged

skippable_tumblr_ids =
Tumblr.
select(:id).
where('tumblrs.last_reblogged_post is not null and tumblrs.last_reblogged_post > ?', (DateTime.now << 1))

@total_posts = Post.count

@number_waiting_posts = Post.
where('skip is not ?', true).
where('posted = ?', false).
where('tumblr_id not in (?)', skippable_tumblr_ids).
where('score >= ?', MIN_SCORE).
count
@posts = next_posts().limit(500).to_a

@posts = Post.
where('skip is not ?', true).
where(:posted => false).
where('tumblr_id not in (?)', skippable_tumblr_ids).
order(Sequel.desc(:score), Sequel.desc(:fetched)).
limit(500).to_a

tags_with_score = tags_with_score_in_hash

Expand Down Expand Up @@ -328,14 +341,12 @@ def fetch_tags(tags_names)

tags_with_score = tags_with_score_in_hash

fingerprints = {}
semaphore = Mutex.new
found_posts.each do |found_post|
begin
if (post = create_post(found_post, tags_with_score))
posts_count += 1
if post.img_url && (post.score >= MIN_SCORE) && (!post.img_saved)
hydra.queue(create_storage_request(post, fingerprints, semaphore))
hydra.queue(create_storage_request(post))
end
end
rescue Exception => e
Expand All @@ -344,33 +355,15 @@ def fetch_tags(tags_names)
end
hydra.run

fingerprints.each_pair do |post_id, fingerprint|
post = Post.where(:id => post_id).first
if fingerprint
post.update({:img_saved => true, :fingerprint => fingerprint})
unless Post.
where('fingerprint is not null').
where('id != ?', post_id).
where('hamming(fingerprint, (select fingerprint from posts where id = ?)) >= ?', post_id, DUPLICATE_LEVEL).
empty?
post.update({:skip => true})
end
else
post.update({:img_saved => true})
end
end

Tag.where(:name => tags_names).update(:last_fetch => DateTime.now)
posts_count
end


# Create the request to store an image
# @params post [Post] the post we do the stuff for
# @param fingerprints [Hash<Integer, String>] hash to add fingerprint
# @param semaphore [Mutex] a mutex to synchronize
# @return []Typhoeus::Request} the Request to add
def create_storage_request(post, fingerprints, semaphore)
def create_storage_request(post)
request = Typhoeus::Request.new post.img_url
request.on_complete do |response|
if response.code == 200
Expand All @@ -380,12 +373,22 @@ def create_storage_request(post, fingerprints, semaphore)
file.write response.body
end

semaphore.synchronize do
if File.exist? dest_file
post_fingerprint = Phashion::Image.new(dest_file).fingerprint
fingerprints[post.id] = Sequel.lit("B'#{post_fingerprint.to_s(2).rjust(64, '0')}'")
else
fingerprints[post.id] = nil
if File.exist? dest_file
post_fingerprint = Phashion::Image.new(dest_file).fingerprint
sql_fingerprint = Sequel.lit("B'#{post_fingerprint.to_s(2).rjust(64, '0')}'")

DATABASE.transaction do
DATABASE[
'UPDATE posts SET img_saved = TRUE, fingerprint = ?, skip = exists(select * from posts p2 where p2.fingerprint is not null and p2.id != ? and hamming(p2.fingerprint, ?) >= ?) WHERE id = ?',
sql_fingerprint,
post.id,
sql_fingerprint,
DUPLICATE_LEVEL,
post.id].update
end
else
DATABASE.transaction do
post.update({:img_saved => true})
end
end
end
Expand All @@ -405,7 +408,7 @@ def create_post(values, tags_with_score)
tumblr_name = values[:tumblr_name]
tumblr_url = values[:tumblr_url]
DATABASE.transaction do
DATABASE['INSERT INTO tumblrs (name, url) SELECT ?, ? WHERE NOT EXISTS (SELECT id FROM tumblrs WHERE url = ?)', tumblr_name, tumblr_url, tumblr_url].all
DATABASE['INSERT INTO tumblrs (name, url) SELECT ?, ? WHERE NOT EXISTS (SELECT id FROM tumblrs WHERE url = ?)', tumblr_name, tumblr_url, tumblr_url].insert
end
tumblr = Tumblr.first(:url => values[:tumblr_url])
if tumblr.name != tumblr_name
Expand Down Expand Up @@ -433,7 +436,7 @@ def create_post(values, tags_with_score)
post_insert_query = "INSERT INTO posts (#{posts_insert_params.collect { |param| param[0] }.join(', ')}) SELECT #{Array.new(posts_insert_params.length, '?').join(', ')} WHERE NOT EXISTS (SELECT id FROM posts WHERE tumblr_post_id = ?)"

DATABASE.transaction do
DATABASE[post_insert_query, *posts_insert_params.collect { |param| param[1] }, values[:id]].all
DATABASE[post_insert_query, *posts_insert_params.collect { |param| param[1] }, values[:id]].insert
end
Post.first(:tumblr_post_id => values[:id])
end
Expand All @@ -451,22 +454,6 @@ def reblog(post)
update(:last_reblogged_post => DateTime.now)
end

# Finder for the next posts
def next_posts
Post.
where('skip is not ?', true).
where(:posted => false).
where('tumblr_id not in (?)', skippable_tumblr_ids).
order(Sequel.desc(:score), Sequel.desc(:fetched))
end

def skippable_tumblr_ids
@skippable_tumblr_ids ||=
Tumblr.
select(:id).
where('tumblrs.last_reblogged_post is not null and tumblrs.last_reblogged_post > ?', (DateTime.now << 1))
end

def tags_with_score_in_hash
Tag.where('value != ?', 0).to_hash(:name, :value)
end
Expand Down

0 comments on commit c916111

Please sign in to comment.