From 8a51dbc4bde76733362c36ddef632351324e62d0 Mon Sep 17 00:00:00 2001 From: Dominik Sander Date: Mon, 12 May 2014 00:04:43 +0200 Subject: [PATCH 1/3] Provide an optional threaded background worker Due to the three background workers (scheduler, twitter stream and delayed job) huginn needs a lot of memory to run (about 520MB on my dev machine). This PR introduces an optional threaded background worker which combines the three current separated processed into one (reducing the memory footprint to ~260MB). Since just one instance of the of every background processor is running at a time there should not be any threading related issues. The main gotcha of this is, that it's most likely not possible to run multiple delayed job workers concurrently. The ultimate solution would probably be switching to sidekiq with sidetiq as scheduler, but that is a different task :) When running on MRI the GIL should not be an issue because it is released for most IO bound operations (waiting for the database/website/sleeping). --- Procfile | 4 + bin/schedule.rb | 82 ------------------- bin/threaded.rb | 57 +++++++++++++ bin/twitter_stream.rb | 113 +------------------------- config/initializers/delayed_job.rb | 1 + lib/huginn_scheduler.rb | 87 ++++++++++++++++++++ lib/twitter_stream.rb | 125 +++++++++++++++++++++++++++++ 7 files changed, 275 insertions(+), 194 deletions(-) create mode 100644 bin/threaded.rb create mode 100644 lib/huginn_scheduler.rb create mode 100644 lib/twitter_stream.rb diff --git a/Procfile b/Procfile index 86fee24777..a639152554 100644 --- a/Procfile +++ b/Procfile @@ -4,6 +4,10 @@ schedule: bundle exec rails runner bin/schedule.rb twitter: bundle exec rails runner bin/twitter_stream.rb dj: bundle exec script/delayed_job run +# Procfile for the exprimental threaded scheduler, twitter stream and delayed job +#web: bundle exec rails server +#jobs: bundle exec rails runner bin/threaded.rb + # Possible Profile configuration for production: # web: bundle exec unicorn -c config/unicorn/production.rb # schedule: bundle exec rails runner bin/schedule.rb diff --git a/bin/schedule.rb b/bin/schedule.rb index 43e06d0ce3..7a29fd4c0d 100755 --- a/bin/schedule.rb +++ b/bin/schedule.rb @@ -11,87 +11,5 @@ exit 1 end -require 'rufus/scheduler' - -class HuginnScheduler - attr_accessor :mutex - - def run_schedule(time) - with_mutex do - puts "Queuing schedule for #{time}" - Agent.delay.run_schedule(time) - end - end - - def propagate! - with_mutex do - puts "Queuing event propagation" - Agent.delay.receive! - end - end - - def cleanup_expired_events! - with_mutex do - puts "Running event cleanup" - Event.delay.cleanup_expired! - end - end - - def with_mutex - ActiveRecord::Base.connection_pool.with_connection do - mutex.synchronize do - yield - end - end - end - - def run! - self.mutex = Mutex.new - - rufus_scheduler = Rufus::Scheduler.new - - tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"] - - # Schedule event propagation. - - rufus_scheduler.every '1m' do - propagate! - end - - # Schedule event cleanup. - - rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do - cleanup_expired_events! - end - - # Schedule repeating events. - - %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule| - rufus_scheduler.every schedule do - run_schedule "every_#{schedule}" - end - end - - # Schedule events for specific times. - - # Times are assumed to be in PST for now. Can store a user#timezone later. - 24.times do |hour| - rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do - if hour == 0 - run_schedule "midnight" - elsif hour < 12 - run_schedule "#{hour}am" - elsif hour == 12 - run_schedule "noon" - else - run_schedule "#{hour - 12}pm" - end - end - end - - rufus_scheduler.join - end -end - scheduler = HuginnScheduler.new scheduler.run! \ No newline at end of file diff --git a/bin/threaded.rb b/bin/threaded.rb new file mode 100644 index 0000000000..8d44dd7272 --- /dev/null +++ b/bin/threaded.rb @@ -0,0 +1,57 @@ +require 'thread' + +def stop + puts 'Exiting...' + @scheduler.stop + @dj.stop + @stream.stop +end + +def safely(&block) + begin + yield block + rescue StandardError => e + STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" + STDERR.puts "Terminating myself ..." + stop + end +end + +threads = [] +threads << Thread.new do + safely do + @stream = TwitterStream.new + @stream.run + puts "Twitter stream stopped ..." + end +end + +threads << Thread.new do + safely do + @scheduler = HuginnScheduler.new + @scheduler.run! + puts "Scheduler stopped ..." + end +end + +threads << Thread.new do + safely do + require 'delayed/command' + @dj = Delayed::Worker.new + @dj.start + puts "Delayed job stopped ..." + end +end + +# We need to wait a bit to let delayed_job set it's traps so we can override them +sleep 0.5 + +trap('TERM') do + stop +end + +trap('INT') do + stop +end + +threads.collect { |t| t.join } diff --git a/bin/twitter_stream.rb b/bin/twitter_stream.rb index 945f4a3286..504ce97a5c 100755 --- a/bin/twitter_stream.rb +++ b/bin/twitter_stream.rb @@ -12,115 +12,4 @@ exit 1 end -require 'cgi' -require 'json' -require 'twitter/json_stream' -require 'em-http-request' -require 'pp' - -def stream!(filters, agent, &block) - stream = Twitter::JSONStream.connect( - :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}", - :ssl => true, - :oauth => { - :consumer_key => agent.twitter_consumer_key, - :consumer_secret => agent.twitter_consumer_secret, - :access_key => agent.twitter_oauth_token, - :access_secret => agent.twitter_oauth_token_secret - } - ) - - stream.each_item do |status| - status = JSON.parse(status) if status.is_a?(String) - next unless status - next if status.has_key?('delete') - next unless status['text'] - status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ') - block.call(status) - end - - stream.on_error do |message| - STDERR.puts " --> Twitter error: #{message} <--" - end - - stream.on_no_data do |message| - STDERR.puts " --> Got no data for awhile; trying to reconnect." - EventMachine::stop_event_loop - end - - stream.on_max_reconnects do |timeout, retries| - STDERR.puts " --> Oops, tried too many times! <--" - EventMachine::stop_event_loop - end -end - -def load_and_run(agents) - agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents| - filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m } - - agents.each do |agent| - agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter| - filter_to_agent_map[filter] << agent - end - end - - recent_tweets = [] - - stream!(filter_to_agent_map.keys, agents.first) do |status| - if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash) - puts "Skipping retweet: #{status["text"]}" - elsif recent_tweets.include?(status["id_str"]) - puts "Skipping duplicate tweet: #{status["text"]}" - else - recent_tweets << status["id_str"] - recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH - puts status["text"] - filter_to_agent_map.keys.each do |filter| - if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson - filter_to_agent_map[filter].each do |agent| - puts " -> #{agent.name}" - agent.process_tweet(filter, status) - end - end - end - end - end - end -end - -RELOAD_TIMEOUT = 10.minutes -DUPLICATE_DETECTION_LENGTH = 1000 -SEPARATOR = /[^\w_\-]+/ - -while true - begin - agents = Agents::TwitterStreamAgent.all - - EventMachine::run do - EventMachine.add_periodic_timer(RELOAD_TIMEOUT) { - puts "Reloading EventMachine and all Agents..." - EventMachine::stop_event_loop - } - - if agents.length == 0 - puts "No agents found. Will look again in a minute." - sleep 60 - EventMachine::stop_event_loop - else - puts "Found #{agents.length} agent(s). Loading them now..." - load_and_run agents - end - end - - print "Pausing..."; STDOUT.flush - sleep 1 - puts "done." - rescue SignalException, SystemExit - EventMachine::stop_event_loop if EventMachine.reactor_running? - exit - rescue StandardError => e - STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" - STDERR.puts "Waiting for a couple of minutes..." - sleep 120 - end -end \ No newline at end of file +TwitterStream.new.run \ No newline at end of file diff --git a/config/initializers/delayed_job.rb b/config/initializers/delayed_job.rb index e9560a5969..084d4b9333 100644 --- a/config/initializers/delayed_job.rb +++ b/config/initializers/delayed_job.rb @@ -1,6 +1,7 @@ Delayed::Worker.destroy_failed_jobs = true Delayed::Worker.max_attempts = 5 Delayed::Worker.max_run_time = 20.minutes +Delayed::Worker.read_ahead = 5 Delayed::Worker.default_priority = 10 Delayed::Worker.delay_jobs = !Rails.env.test? diff --git a/lib/huginn_scheduler.rb b/lib/huginn_scheduler.rb new file mode 100644 index 0000000000..fa72b7ea49 --- /dev/null +++ b/lib/huginn_scheduler.rb @@ -0,0 +1,87 @@ +require 'rufus/scheduler' + +class HuginnScheduler + attr_accessor :mutex + + def initialize + @rufus_scheduler = Rufus::Scheduler.new + end + + def stop + @rufus_scheduler.stop + end + + def run_schedule(time) + with_mutex do + puts "Queuing schedule for #{time}" + Agent.delay.run_schedule(time) + end + end + + def propagate! + with_mutex do + puts "Queuing event propagation" + Agent.delay.receive! + end + end + + def cleanup_expired_events! + with_mutex do + puts "Running event cleanup" + Event.delay.cleanup_expired! + end + end + + def with_mutex + ActiveRecord::Base.connection_pool.with_connection do + mutex.synchronize do + yield + end + end + end + + def run! + self.mutex = Mutex.new + + tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"] + + # Schedule event propagation. + + @rufus_scheduler.every '1m' do + propagate! + end + + # Schedule event cleanup. + + @rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do + cleanup_expired_events! + end + + # Schedule repeating events. + + %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule| + @rufus_scheduler.every schedule do + run_schedule "every_#{schedule}" + end + end + + # Schedule events for specific times. + + # Times are assumed to be in PST for now. Can store a user#timezone later. + 24.times do |hour| + @rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do + if hour == 0 + run_schedule "midnight" + elsif hour < 12 + run_schedule "#{hour}am" + elsif hour == 12 + run_schedule "noon" + else + run_schedule "#{hour - 12}pm" + end + end + end + + @rufus_scheduler.join + end +end diff --git a/lib/twitter_stream.rb b/lib/twitter_stream.rb new file mode 100644 index 0000000000..3aac876a9d --- /dev/null +++ b/lib/twitter_stream.rb @@ -0,0 +1,125 @@ +require 'cgi' +require 'json' +require 'twitter/json_stream' +require 'em-http-request' +require 'pp' + +class TwitterStream + def initialize + @running = true + end + + def stop + @running = false + EventMachine::stop_event_loop if EventMachine.reactor_running? + end + + def stream!(filters, agent, &block) + stream = Twitter::JSONStream.connect( + :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}", + :ssl => true, + :oauth => { + :consumer_key => agent.twitter_consumer_key, + :consumer_secret => agent.twitter_consumer_secret, + :access_key => agent.twitter_oauth_token, + :access_secret => agent.twitter_oauth_token_secret + } + ) + + stream.each_item do |status| + status = JSON.parse(status) if status.is_a?(String) + next unless status + next if status.has_key?('delete') + next unless status['text'] + status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ') + block.call(status) + end + + stream.on_error do |message| + STDERR.puts " --> Twitter error: #{message} <--" + end + + stream.on_no_data do |message| + STDERR.puts " --> Got no data for awhile; trying to reconnect." + EventMachine::stop_event_loop + end + + stream.on_max_reconnects do |timeout, retries| + STDERR.puts " --> Oops, tried too many times! <--" + EventMachine::stop_event_loop + end + end + + def load_and_run(agents) + agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents| + filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m } + + agents.each do |agent| + agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter| + filter_to_agent_map[filter] << agent + end + end + + recent_tweets = [] + + stream!(filter_to_agent_map.keys, agents.first) do |status| + if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash) + puts "Skipping retweet: #{status["text"]}" + elsif recent_tweets.include?(status["id_str"]) + puts "Skipping duplicate tweet: #{status["text"]}" + else + recent_tweets << status["id_str"] + recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH + puts status["text"] + filter_to_agent_map.keys.each do |filter| + if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson + filter_to_agent_map[filter].each do |agent| + puts " -> #{agent.name}" + agent.process_tweet(filter, status) + end + end + end + end + end + end + end + + RELOAD_TIMEOUT = 10.minutes + DUPLICATE_DETECTION_LENGTH = 1000 + SEPARATOR = /[^\w_\-]+/ + + def run + while @running + begin + agents = Agents::TwitterStreamAgent.all + + EventMachine::run do + EventMachine.add_periodic_timer(RELOAD_TIMEOUT) { + puts "Reloading EventMachine and all Agents..." + EventMachine::stop_event_loop + } + + if agents.length == 0 + puts "No agents found. Will look again in a minute." + sleep 60 + EventMachine::stop_event_loop + else + puts "Found #{agents.length} agent(s). Loading them now..." + load_and_run agents + end + end + + print "Pausing..."; STDOUT.flush + sleep 1 + puts "done." + rescue SignalException, SystemExit + @running = false + EventMachine::stop_event_loop if EventMachine.reactor_running? + rescue StandardError => e + STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" + STDERR.puts "Waiting for a couple of minutes..." + sleep 120 + end + end + end +end \ No newline at end of file From ec0ec300c5a81067763c3bba8919eee599339c0b Mon Sep 17 00:00:00 2001 From: Dominik Sander Date: Mon, 12 May 2014 23:16:41 +0200 Subject: [PATCH 2/3] Use threaded workers in the chef production Procfile --- .../site-cookbooks/huginn_production/files/default/Procfile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/deployment/site-cookbooks/huginn_production/files/default/Procfile b/deployment/site-cookbooks/huginn_production/files/default/Procfile index 295bbe440e..fcc4261188 100644 --- a/deployment/site-cookbooks/huginn_production/files/default/Procfile +++ b/deployment/site-cookbooks/huginn_production/files/default/Procfile @@ -1,4 +1,2 @@ web: sudo bundle exec unicorn_rails -c config/unicorn.rb -E production -schedule: sudo RAILS_ENV=production bundle exec rails runner bin/schedule.rb -twitter: sudo RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb -dj: sudo RAILS_ENV=production bundle exec script/delayed_job run +jobs: sudo RAILS_ENV=production bundle exec rails runner bin/threaded.rb \ No newline at end of file From df9032272c526697cbd68a9acbcd8ba84de8cf8a Mon Sep 17 00:00:00 2001 From: Dominik Sander Date: Sun, 1 Jun 2014 12:15:50 +0200 Subject: [PATCH 3/3] Threaded background worker now is the default --- Procfile | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/Procfile b/Procfile index a639152554..aeb2ac4c03 100644 --- a/Procfile +++ b/Procfile @@ -1,15 +1,13 @@ -# Procfile for development: +# Procfile for development using the new threaded worker (scheduler, twitter stream and delayed job) web: bundle exec rails server -schedule: bundle exec rails runner bin/schedule.rb -twitter: bundle exec rails runner bin/twitter_stream.rb -dj: bundle exec script/delayed_job run - -# Procfile for the exprimental threaded scheduler, twitter stream and delayed job -#web: bundle exec rails server -#jobs: bundle exec rails runner bin/threaded.rb +jobs: bundle exec rails runner bin/threaded.rb # Possible Profile configuration for production: # web: bundle exec unicorn -c config/unicorn/production.rb -# schedule: bundle exec rails runner bin/schedule.rb -# twitter: bundle exec rails runner bin/twitter_stream.rb -# dj: bundle exec script/delayed_job run +# jobs: bundle exec rails runner bin/threaded.rb + +# Old version with seperate processes (use this if you have issues with the threaded version) +#web: bundle exec rails server +#schedule: bundle exec rails runner bin/schedule.rb +#twitter: bundle exec rails runner bin/twitter_stream.rb +#dj: bundle exec script/delayed_job run \ No newline at end of file