Skip to content

Commit

Permalink
Merge pull request #318 from dsander/threaded-background-workers
Browse files Browse the repository at this point in the history
Provide a new threaded background worker
  • Loading branch information
dsander committed Jun 2, 2014
2 parents 43d0256 + df90322 commit 577b95f
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 204 deletions.
16 changes: 9 additions & 7 deletions Procfile
@@ -1,11 +1,13 @@
# Procfile for development:
# Procfile for development using the new threaded worker (scheduler, twitter stream and delayed job)
web: bundle exec rails server
schedule: bundle exec rails runner bin/schedule.rb
twitter: bundle exec rails runner bin/twitter_stream.rb
dj: bundle exec script/delayed_job run
jobs: bundle exec rails runner bin/threaded.rb

# Possible Profile configuration for production:
# web: bundle exec unicorn -c config/unicorn/production.rb
# schedule: bundle exec rails runner bin/schedule.rb
# twitter: bundle exec rails runner bin/twitter_stream.rb
# dj: bundle exec script/delayed_job run
# jobs: bundle exec rails runner bin/threaded.rb

# Old version with seperate processes (use this if you have issues with the threaded version)
#web: bundle exec rails server
#schedule: bundle exec rails runner bin/schedule.rb
#twitter: bundle exec rails runner bin/twitter_stream.rb
#dj: bundle exec script/delayed_job run
82 changes: 0 additions & 82 deletions bin/schedule.rb
Expand Up @@ -11,87 +11,5 @@
exit 1
end

require 'rufus/scheduler'

class HuginnScheduler
attr_accessor :mutex

def run_schedule(time)
with_mutex do
puts "Queuing schedule for #{time}"
Agent.delay.run_schedule(time)
end
end

def propagate!
with_mutex do
puts "Queuing event propagation"
Agent.delay.receive!
end
end

def cleanup_expired_events!
with_mutex do
puts "Running event cleanup"
Event.delay.cleanup_expired!
end
end

def with_mutex
ActiveRecord::Base.connection_pool.with_connection do
mutex.synchronize do
yield
end
end
end

def run!
self.mutex = Mutex.new

rufus_scheduler = Rufus::Scheduler.new

tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]

# Schedule event propagation.

rufus_scheduler.every '1m' do
propagate!
end

# Schedule event cleanup.

rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
cleanup_expired_events!
end

# Schedule repeating events.

%w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
rufus_scheduler.every schedule do
run_schedule "every_#{schedule}"
end
end

# Schedule events for specific times.

# Times are assumed to be in PST for now. Can store a user#timezone later.
24.times do |hour|
rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
if hour == 0
run_schedule "midnight"
elsif hour < 12
run_schedule "#{hour}am"
elsif hour == 12
run_schedule "noon"
else
run_schedule "#{hour - 12}pm"
end
end
end

rufus_scheduler.join
end
end

scheduler = HuginnScheduler.new
scheduler.run!
57 changes: 57 additions & 0 deletions bin/threaded.rb
@@ -0,0 +1,57 @@
require 'thread'

def stop
puts 'Exiting...'
@scheduler.stop
@dj.stop
@stream.stop
end

def safely(&block)
begin
yield block
rescue StandardError => e
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
STDERR.puts "Terminating myself ..."
stop
end
end

threads = []
threads << Thread.new do
safely do
@stream = TwitterStream.new
@stream.run
puts "Twitter stream stopped ..."
end
end

threads << Thread.new do
safely do
@scheduler = HuginnScheduler.new
@scheduler.run!
puts "Scheduler stopped ..."
end
end

threads << Thread.new do
safely do
require 'delayed/command'
@dj = Delayed::Worker.new
@dj.start
puts "Delayed job stopped ..."
end
end

# We need to wait a bit to let delayed_job set it's traps so we can override them
sleep 0.5

trap('TERM') do
stop
end

trap('INT') do
stop
end

threads.collect { |t| t.join }
113 changes: 1 addition & 112 deletions bin/twitter_stream.rb
Expand Up @@ -12,115 +12,4 @@
exit 1
end

require 'cgi'
require 'json'
require 'twitter/json_stream'
require 'em-http-request'
require 'pp'

def stream!(filters, agent, &block)
stream = Twitter::JSONStream.connect(
:path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
:ssl => true,
:oauth => {
:consumer_key => agent.twitter_consumer_key,
:consumer_secret => agent.twitter_consumer_secret,
:access_key => agent.twitter_oauth_token,
:access_secret => agent.twitter_oauth_token_secret
}
)

stream.each_item do |status|
status = JSON.parse(status) if status.is_a?(String)
next unless status
next if status.has_key?('delete')
next unless status['text']
status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, ' ')
block.call(status)
end

stream.on_error do |message|
STDERR.puts " --> Twitter error: #{message} <--"
end

stream.on_no_data do |message|
STDERR.puts " --> Got no data for awhile; trying to reconnect."
EventMachine::stop_event_loop
end

stream.on_max_reconnects do |timeout, retries|
STDERR.puts " --> Oops, tried too many times! <--"
EventMachine::stop_event_loop
end
end

def load_and_run(agents)
agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }

agents.each do |agent|
agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
filter_to_agent_map[filter] << agent
end
end

recent_tweets = []

stream!(filter_to_agent_map.keys, agents.first) do |status|
if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
puts "Skipping retweet: #{status["text"]}"
elsif recent_tweets.include?(status["id_str"])
puts "Skipping duplicate tweet: #{status["text"]}"
else
recent_tweets << status["id_str"]
recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
puts status["text"]
filter_to_agent_map.keys.each do |filter|
if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
filter_to_agent_map[filter].each do |agent|
puts " -> #{agent.name}"
agent.process_tweet(filter, status)
end
end
end
end
end
end
end

RELOAD_TIMEOUT = 10.minutes
DUPLICATE_DETECTION_LENGTH = 1000
SEPARATOR = /[^\w_\-]+/

while true
begin
agents = Agents::TwitterStreamAgent.all

EventMachine::run do
EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
puts "Reloading EventMachine and all Agents..."
EventMachine::stop_event_loop
}

if agents.length == 0
puts "No agents found. Will look again in a minute."
sleep 60
EventMachine::stop_event_loop
else
puts "Found #{agents.length} agent(s). Loading them now..."
load_and_run agents
end
end

print "Pausing..."; STDOUT.flush
sleep 1
puts "done."
rescue SignalException, SystemExit
EventMachine::stop_event_loop if EventMachine.reactor_running?
exit
rescue StandardError => e
STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
STDERR.puts "Waiting for a couple of minutes..."
sleep 120
end
end
TwitterStream.new.run
1 change: 1 addition & 0 deletions config/initializers/delayed_job.rb
@@ -1,6 +1,7 @@
Delayed::Worker.destroy_failed_jobs = true
Delayed::Worker.max_attempts = 5
Delayed::Worker.max_run_time = 20.minutes
Delayed::Worker.read_ahead = 5
Delayed::Worker.default_priority = 10
Delayed::Worker.delay_jobs = !Rails.env.test?

Expand Down
@@ -1,4 +1,2 @@
web: sudo bundle exec unicorn_rails -c config/unicorn.rb -E production
schedule: sudo RAILS_ENV=production bundle exec rails runner bin/schedule.rb
twitter: sudo RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb
dj: sudo RAILS_ENV=production bundle exec script/delayed_job run
jobs: sudo RAILS_ENV=production bundle exec rails runner bin/threaded.rb
87 changes: 87 additions & 0 deletions lib/huginn_scheduler.rb
@@ -0,0 +1,87 @@
require 'rufus/scheduler'

class HuginnScheduler
attr_accessor :mutex

def initialize
@rufus_scheduler = Rufus::Scheduler.new
end

def stop
@rufus_scheduler.stop
end

def run_schedule(time)
with_mutex do
puts "Queuing schedule for #{time}"
Agent.delay.run_schedule(time)
end
end

def propagate!
with_mutex do
puts "Queuing event propagation"
Agent.delay.receive!
end
end

def cleanup_expired_events!
with_mutex do
puts "Running event cleanup"
Event.delay.cleanup_expired!
end
end

def with_mutex
ActiveRecord::Base.connection_pool.with_connection do
mutex.synchronize do
yield
end
end
end

def run!
self.mutex = Mutex.new

tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]

# Schedule event propagation.

@rufus_scheduler.every '1m' do
propagate!
end

# Schedule event cleanup.

@rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
cleanup_expired_events!
end

# Schedule repeating events.

%w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
@rufus_scheduler.every schedule do
run_schedule "every_#{schedule}"
end
end

# Schedule events for specific times.

# Times are assumed to be in PST for now. Can store a user#timezone later.
24.times do |hour|
@rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
if hour == 0
run_schedule "midnight"
elsif hour < 12
run_schedule "#{hour}am"
elsif hour == 12
run_schedule "noon"
else
run_schedule "#{hour - 12}pm"
end
end
end

@rufus_scheduler.join
end
end

0 comments on commit 577b95f

Please sign in to comment.