Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 191 lines (156 sloc) 4.32 KB
#!/usr/bin/env ruby
require 'rubygems'
require 'chatterbot'
require 'stellarjob'
require 'thread'
class Stellarjob::Runner
def initialize
@work_semaphore = Mutex.new
end
def catchup_backlog_all
tries = 1
while true
begin
catchup_backlog
rescue Twitter::Error::TooManyRequests => e
minutes = 2 ** (tries - 1)
puts "Searching rate-limited; sleeping for #{minutes} minutes"
sleep(minutes * 60)
tries += 1
retry
end
sleep(10)
end
end
def stream_all
tries = 1
begin
stream
rescue JSON::ParserError => e
raise unless e.to_s =~ /Exceeded connection limit/
minutes = 2 ** tries
puts "Streaming rate-limited; sleeping for #{minutes} minutes"
sleep(minutes * 60)
tries += 1
retry
end
end
def process_all_lines
while true
process_lines
sleep 5
end
end
def bot
Stellarjob::Twitter.bot
end
def run
Thread.abort_on_exception = true
t1 = Thread.new {stream_all}
t2 = Thread.new {catchup_backlog}
t3 = Thread.new {process_all_lines}
t1.join
t2.join
t3.join
end
def handle_tweet(tweet, source)
puts "Received tweet via #{source}"
if tweet.user.username == Stellarjob::Config.config[:twitter_username] &&
!tweet.text.start_with?('@stellarjob -f ')
puts "Skipping my own tweet: #{tweet.text} (#{tweet.id})"
return
end
@work_semaphore.synchronize do
begin
Stellarjob::Model::Tweet.create(
tweet_id: tweet.id,
text: tweet.text,
sender: tweet.user.username
)
rescue Mongo::OperationFailure => e
raise unless e.error_code == 11000
puts "Skipping duplicate tweet from #{tweet.user.username} (#{tweet.id})"
return
end
interpret(tweet)
end
end
def stream
bot.streaming_client.user(stall_warnings: 'false') do |event|
case event
when Twitter::Tweet
unless event.retweeted_status.nil?
puts "Retweeted tweet: #{event.text} (#{event.id}) #{event.retweeted_status}"
next
end
handle_tweet(event, 'streaming')
else
puts "Received other event: #{event}"
end
end
end
def catchup_backlog
bot.replies do |tweet|
handle_tweet(tweet, 'search')
end
bot.update_config
end
def interpret(tweet)
text = tweet.text
sender = tweet.user.username
# @stellarjob: +++@thegdb for being awesome
if text =~ /\+\+\s*@(\w+)\s*(.*)\z/
recipient = $1
reason = $2
Stellarjob::Command::GivePoint.handle(tweet, sender, recipient, reason)
elsif text =~ /yes/i && tweet.in_reply_to_tweet_id
Stellarjob::Command::LinkAttemptReply.handle(tweet, sender, tweet.in_reply_to_tweet_id)
else
puts "Could not parse tweet: #{text}"
end
end
def process_lines
raw = Stellarjob::Stellar.account_lines(Stellarjob::Stellar.stellarjob_account_id)
lines = raw.
fetch('result').
fetch('lines')
puts "Processing #{lines.length} total trust lines"
@work_semaphore.synchronize do
process_each_line(lines)
end
end
def process_each_line(lines)
account_limits = lines.map do |line|
next unless line['currency'] == '+++'
begin
limit = Integer(line['limit_peer'], 10)
rescue ArgumentError => e
puts "Could not parse integer from line #{line.inspect}: #{e}"
next
end
account = line['account']
[account, limit]
end
account_limits = Set.new(account_limits)
# Process any new entries
account_limits.each do |account, limit|
next if Stellarjob::Model::Line.processed?(account, limit)
if link_attempt = Stellarjob::Model::LinkAttempt.first(link_amount: limit, active: true)
link_attempt.prepare(account)
else
puts "Found a trust line without a corresponding active link attempt: #{account} / #{limit}"
end
Stellarjob::Model::Line.processed!(account, limit)
end
# Clear missing cache entries
Stellarjob::Model::Line.each do |line|
next if account_limits.include?([line.account, line.amount])
puts "Deleting now-absent line: #{line}"
line.delete
end
end
end
$stdout.sync = true
$stderr.sync = true
Stellarjob.init
Stellarjob::Runner.new.run