diff --git a/Procfile b/Procfile index 86fee24777..aeb2ac4c03 100644 --- a/Procfile +++ b/Procfile @@ -1,11 +1,13 @@ -# Procfile for development: +# Procfile for development using the new threaded worker (scheduler, twitter stream and delayed job) web: bundle exec rails server -schedule: bundle exec rails runner bin/schedule.rb -twitter: bundle exec rails runner bin/twitter_stream.rb -dj: bundle exec script/delayed_job run +jobs: bundle exec rails runner bin/threaded.rb # Possible Profile configuration for production: # web: bundle exec unicorn -c config/unicorn/production.rb -# schedule: bundle exec rails runner bin/schedule.rb -# twitter: bundle exec rails runner bin/twitter_stream.rb -# dj: bundle exec script/delayed_job run +# jobs: bundle exec rails runner bin/threaded.rb + +# Old version with seperate processes (use this if you have issues with the threaded version) +#web: bundle exec rails server +#schedule: bundle exec rails runner bin/schedule.rb +#twitter: bundle exec rails runner bin/twitter_stream.rb +#dj: bundle exec script/delayed_job run \ No newline at end of file diff --git a/bin/schedule.rb b/bin/schedule.rb index 43e06d0ce3..7a29fd4c0d 100755 --- a/bin/schedule.rb +++ b/bin/schedule.rb @@ -11,87 +11,5 @@ exit 1 end -require 'rufus/scheduler' - -class HuginnScheduler - attr_accessor :mutex - - def run_schedule(time) - with_mutex do - puts "Queuing schedule for #{time}" - Agent.delay.run_schedule(time) - end - end - - def propagate! - with_mutex do - puts "Queuing event propagation" - Agent.delay.receive! - end - end - - def cleanup_expired_events! - with_mutex do - puts "Running event cleanup" - Event.delay.cleanup_expired! - end - end - - def with_mutex - ActiveRecord::Base.connection_pool.with_connection do - mutex.synchronize do - yield - end - end - end - - def run! - self.mutex = Mutex.new - - rufus_scheduler = Rufus::Scheduler.new - - tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"] - - # Schedule event propagation. - - rufus_scheduler.every '1m' do - propagate! - end - - # Schedule event cleanup. - - rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do - cleanup_expired_events! - end - - # Schedule repeating events. - - %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule| - rufus_scheduler.every schedule do - run_schedule "every_#{schedule}" - end - end - - # Schedule events for specific times. - - # Times are assumed to be in PST for now. Can store a user#timezone later. - 24.times do |hour| - rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do - if hour == 0 - run_schedule "midnight" - elsif hour < 12 - run_schedule "#{hour}am" - elsif hour == 12 - run_schedule "noon" - else - run_schedule "#{hour - 12}pm" - end - end - end - - rufus_scheduler.join - end -end - scheduler = HuginnScheduler.new scheduler.run! \ No newline at end of file diff --git a/bin/threaded.rb b/bin/threaded.rb new file mode 100644 index 0000000000..8d44dd7272 --- /dev/null +++ b/bin/threaded.rb @@ -0,0 +1,57 @@ +require 'thread' + +def stop + puts 'Exiting...' + @scheduler.stop + @dj.stop + @stream.stop +end + +def safely(&block) + begin + yield block + rescue StandardError => e + STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" + STDERR.puts "Terminating myself ..." + stop + end +end + +threads = [] +threads << Thread.new do + safely do + @stream = TwitterStream.new + @stream.run + puts "Twitter stream stopped ..." + end +end + +threads << Thread.new do + safely do + @scheduler = HuginnScheduler.new + @scheduler.run! + puts "Scheduler stopped ..." + end +end + +threads << Thread.new do + safely do + require 'delayed/command' + @dj = Delayed::Worker.new + @dj.start + puts "Delayed job stopped ..." + end +end + +# We need to wait a bit to let delayed_job set it's traps so we can override them +sleep 0.5 + +trap('TERM') do + stop +end + +trap('INT') do + stop +end + +threads.collect { |t| t.join } diff --git a/bin/twitter_stream.rb b/bin/twitter_stream.rb index 945f4a3286..504ce97a5c 100755 --- a/bin/twitter_stream.rb +++ b/bin/twitter_stream.rb @@ -12,115 +12,4 @@ exit 1 end -require 'cgi' -require 'json' -require 'twitter/json_stream' -require 'em-http-request' -require 'pp' - -def stream!(filters, agent, &block) - stream = Twitter::JSONStream.connect( - :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}", - :ssl => true, - :oauth => { - :consumer_key => agent.twitter_consumer_key, - :consumer_secret => agent.twitter_consumer_secret, - :access_key => agent.twitter_oauth_token, - :access_secret => agent.twitter_oauth_token_secret - } - ) - - stream.each_item do |status| - status = JSON.parse(status) if status.is_a?(String) - next unless status - next if status.has_key?('delete') - next unless status['text'] - status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ') - block.call(status) - end - - stream.on_error do |message| - STDERR.puts " --> Twitter error: #{message} <--" - end - - stream.on_no_data do |message| - STDERR.puts " --> Got no data for awhile; trying to reconnect." - EventMachine::stop_event_loop - end - - stream.on_max_reconnects do |timeout, retries| - STDERR.puts " --> Oops, tried too many times! <--" - EventMachine::stop_event_loop - end -end - -def load_and_run(agents) - agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents| - filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m } - - agents.each do |agent| - agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter| - filter_to_agent_map[filter] << agent - end - end - - recent_tweets = [] - - stream!(filter_to_agent_map.keys, agents.first) do |status| - if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash) - puts "Skipping retweet: #{status["text"]}" - elsif recent_tweets.include?(status["id_str"]) - puts "Skipping duplicate tweet: #{status["text"]}" - else - recent_tweets << status["id_str"] - recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH - puts status["text"] - filter_to_agent_map.keys.each do |filter| - if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson - filter_to_agent_map[filter].each do |agent| - puts " -> #{agent.name}" - agent.process_tweet(filter, status) - end - end - end - end - end - end -end - -RELOAD_TIMEOUT = 10.minutes -DUPLICATE_DETECTION_LENGTH = 1000 -SEPARATOR = /[^\w_\-]+/ - -while true - begin - agents = Agents::TwitterStreamAgent.all - - EventMachine::run do - EventMachine.add_periodic_timer(RELOAD_TIMEOUT) { - puts "Reloading EventMachine and all Agents..." - EventMachine::stop_event_loop - } - - if agents.length == 0 - puts "No agents found. Will look again in a minute." - sleep 60 - EventMachine::stop_event_loop - else - puts "Found #{agents.length} agent(s). Loading them now..." - load_and_run agents - end - end - - print "Pausing..."; STDOUT.flush - sleep 1 - puts "done." - rescue SignalException, SystemExit - EventMachine::stop_event_loop if EventMachine.reactor_running? - exit - rescue StandardError => e - STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" - STDERR.puts "Waiting for a couple of minutes..." - sleep 120 - end -end \ No newline at end of file +TwitterStream.new.run \ No newline at end of file diff --git a/config/initializers/delayed_job.rb b/config/initializers/delayed_job.rb index e9560a5969..084d4b9333 100644 --- a/config/initializers/delayed_job.rb +++ b/config/initializers/delayed_job.rb @@ -1,6 +1,7 @@ Delayed::Worker.destroy_failed_jobs = true Delayed::Worker.max_attempts = 5 Delayed::Worker.max_run_time = 20.minutes +Delayed::Worker.read_ahead = 5 Delayed::Worker.default_priority = 10 Delayed::Worker.delay_jobs = !Rails.env.test? diff --git a/deployment/site-cookbooks/huginn_production/files/default/Procfile b/deployment/site-cookbooks/huginn_production/files/default/Procfile index 295bbe440e..fcc4261188 100644 --- a/deployment/site-cookbooks/huginn_production/files/default/Procfile +++ b/deployment/site-cookbooks/huginn_production/files/default/Procfile @@ -1,4 +1,2 @@ web: sudo bundle exec unicorn_rails -c config/unicorn.rb -E production -schedule: sudo RAILS_ENV=production bundle exec rails runner bin/schedule.rb -twitter: sudo RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb -dj: sudo RAILS_ENV=production bundle exec script/delayed_job run +jobs: sudo RAILS_ENV=production bundle exec rails runner bin/threaded.rb \ No newline at end of file diff --git a/lib/huginn_scheduler.rb b/lib/huginn_scheduler.rb new file mode 100644 index 0000000000..fa72b7ea49 --- /dev/null +++ b/lib/huginn_scheduler.rb @@ -0,0 +1,87 @@ +require 'rufus/scheduler' + +class HuginnScheduler + attr_accessor :mutex + + def initialize + @rufus_scheduler = Rufus::Scheduler.new + end + + def stop + @rufus_scheduler.stop + end + + def run_schedule(time) + with_mutex do + puts "Queuing schedule for #{time}" + Agent.delay.run_schedule(time) + end + end + + def propagate! + with_mutex do + puts "Queuing event propagation" + Agent.delay.receive! + end + end + + def cleanup_expired_events! + with_mutex do + puts "Running event cleanup" + Event.delay.cleanup_expired! + end + end + + def with_mutex + ActiveRecord::Base.connection_pool.with_connection do + mutex.synchronize do + yield + end + end + end + + def run! + self.mutex = Mutex.new + + tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"] + + # Schedule event propagation. + + @rufus_scheduler.every '1m' do + propagate! + end + + # Schedule event cleanup. + + @rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do + cleanup_expired_events! + end + + # Schedule repeating events. + + %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule| + @rufus_scheduler.every schedule do + run_schedule "every_#{schedule}" + end + end + + # Schedule events for specific times. + + # Times are assumed to be in PST for now. Can store a user#timezone later. + 24.times do |hour| + @rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do + if hour == 0 + run_schedule "midnight" + elsif hour < 12 + run_schedule "#{hour}am" + elsif hour == 12 + run_schedule "noon" + else + run_schedule "#{hour - 12}pm" + end + end + end + + @rufus_scheduler.join + end +end diff --git a/lib/twitter_stream.rb b/lib/twitter_stream.rb new file mode 100644 index 0000000000..3aac876a9d --- /dev/null +++ b/lib/twitter_stream.rb @@ -0,0 +1,125 @@ +require 'cgi' +require 'json' +require 'twitter/json_stream' +require 'em-http-request' +require 'pp' + +class TwitterStream + def initialize + @running = true + end + + def stop + @running = false + EventMachine::stop_event_loop if EventMachine.reactor_running? + end + + def stream!(filters, agent, &block) + stream = Twitter::JSONStream.connect( + :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}", + :ssl => true, + :oauth => { + :consumer_key => agent.twitter_consumer_key, + :consumer_secret => agent.twitter_consumer_secret, + :access_key => agent.twitter_oauth_token, + :access_secret => agent.twitter_oauth_token_secret + } + ) + + stream.each_item do |status| + status = JSON.parse(status) if status.is_a?(String) + next unless status + next if status.has_key?('delete') + next unless status['text'] + status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ') + block.call(status) + end + + stream.on_error do |message| + STDERR.puts " --> Twitter error: #{message} <--" + end + + stream.on_no_data do |message| + STDERR.puts " --> Got no data for awhile; trying to reconnect." + EventMachine::stop_event_loop + end + + stream.on_max_reconnects do |timeout, retries| + STDERR.puts " --> Oops, tried too many times! <--" + EventMachine::stop_event_loop + end + end + + def load_and_run(agents) + agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents| + filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m } + + agents.each do |agent| + agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter| + filter_to_agent_map[filter] << agent + end + end + + recent_tweets = [] + + stream!(filter_to_agent_map.keys, agents.first) do |status| + if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash) + puts "Skipping retweet: #{status["text"]}" + elsif recent_tweets.include?(status["id_str"]) + puts "Skipping duplicate tweet: #{status["text"]}" + else + recent_tweets << status["id_str"] + recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH + puts status["text"] + filter_to_agent_map.keys.each do |filter| + if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson + filter_to_agent_map[filter].each do |agent| + puts " -> #{agent.name}" + agent.process_tweet(filter, status) + end + end + end + end + end + end + end + + RELOAD_TIMEOUT = 10.minutes + DUPLICATE_DETECTION_LENGTH = 1000 + SEPARATOR = /[^\w_\-]+/ + + def run + while @running + begin + agents = Agents::TwitterStreamAgent.all + + EventMachine::run do + EventMachine.add_periodic_timer(RELOAD_TIMEOUT) { + puts "Reloading EventMachine and all Agents..." + EventMachine::stop_event_loop + } + + if agents.length == 0 + puts "No agents found. Will look again in a minute." + sleep 60 + EventMachine::stop_event_loop + else + puts "Found #{agents.length} agent(s). Loading them now..." + load_and_run agents + end + end + + print "Pausing..."; STDOUT.flush + sleep 1 + puts "done." + rescue SignalException, SystemExit + @running = false + EventMachine::stop_event_loop if EventMachine.reactor_running? + rescue StandardError => e + STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n" + STDERR.puts "Waiting for a couple of minutes..." + sleep 120 + end + end + end +end \ No newline at end of file