Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Twitter agents #3230

Merged
merged 1 commit into from Mar 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 90 additions & 5 deletions app/concerns/twitter_concern.rb
Expand Up @@ -7,15 +7,23 @@ module TwitterConcern
validate :validate_twitter_options
valid_oauth_providers :twitter

gem_dependency_check { defined?(Twitter) && Devise.omniauth_providers.include?(:twitter) && ENV['TWITTER_OAUTH_KEY'].present? && ENV['TWITTER_OAUTH_SECRET'].present? }
gem_dependency_check {
defined?(Twitter) &&
Devise.omniauth_providers.include?(:twitter) &&
ENV['TWITTER_OAUTH_KEY'].present? &&
ENV['TWITTER_OAUTH_SECRET'].present?
}
end

def validate_twitter_options
unless twitter_consumer_key.present? &&
twitter_consumer_secret.present? &&
twitter_oauth_token.present? &&
twitter_oauth_token_secret.present?
errors.add(:base, "Twitter consumer_key, consumer_secret, oauth_token, and oauth_token_secret are required to authenticate with the Twitter API. You can provide these as options to this Agent, or as Credentials with the same names, but starting with 'twitter_'.")
twitter_consumer_secret.present? &&
twitter_oauth_token.present? &&
twitter_oauth_token_secret.present?
errors.add(
:base,
"Twitter consumer_key, consumer_secret, oauth_token, and oauth_token_secret are required to authenticate with the Twitter API. You can provide these as options to this Agent, or as Credentials with the same names, but starting with 'twitter_'."
)
end
end

Expand Down Expand Up @@ -44,6 +52,56 @@ def twitter
end
end

HTML_ENTITIES = {
'&' => '&',
'&lt;' => '<',
'&gt;' => '>',
}
RE_HTML_ENTITIES = Regexp.union(HTML_ENTITIES.keys)

def format_tweet(tweet)
attrs =
case tweet
when Twitter::Tweet
tweet.attrs
when Hash
if tweet.key?(:id)
tweet
else
tweet.deep_symbolize_keys
end
else
raise TypeError, "Unexpected tweet type: #{tweet.class}"
end

text = (attrs[:full_text] || attrs[:text])&.dup or return attrs

expanded_text = text.dup.tap { |text|
attrs.dig(:entities, :urls)&.reverse_each do |entity|
from, to = entity[:indices]
text[from...to] = entity[:expanded_url]
end
}
text.gsub!(RE_HTML_ENTITIES, HTML_ENTITIES)
expanded_text.gsub!(RE_HTML_ENTITIES, HTML_ENTITIES)

if attrs[:text]
{
**attrs,
text: text,
expanded_text: expanded_text,
}
else
{
**attrs,
full_text: text,
expanded_text: expanded_text,
}
end
end

module_function :format_tweet

module ClassMethods
def twitter_dependencies_missing
if ENV['TWITTER_OAUTH_KEY'].blank? || ENV['TWITTER_OAUTH_SECRET'].blank?
Expand All @@ -52,6 +110,33 @@ def twitter_dependencies_missing
"## Include the `twitter`, `omniauth-twitter`, and `cantino-twitter-stream` gems in your Gemfile to use Twitter Agents."
end
end

def tweet_event_description(text_key, extra_fields = nil)
<<~MD.indent(4)
{
#{extra_fields&.indent(2)}// ... every Tweet field, including ...
// Huginn automatically decodes "&lt;", "&gt;", and "&amp;" to "<", ">", and "&".
"#{text_key}": "something https://t.co/XXXX",
"user": {
"name": "Mr. Someone",
"screen_name": "Someone",
"location": "Vancouver BC Canada",
"description": "...",
"followers_count": 486,
"friends_count": 1983,
"created_at": "Mon Aug 29 23:38:14 +0000 2011",
"time_zone": "Pacific Time (US & Canada)",
"statuses_count": 3807,
"lang": "en"
},
"retweet_count": 0,
"entities": ...
"lang": "en",
// Huginn adds this field, expanding all shortened t.co URLs in "#{text_key}".
"expanded_text": "something https://example.org/foo/bar"
}
MD
end
end
end

Expand Down
7 changes: 5 additions & 2 deletions app/models/agents/twitter_action_agent.rb
Expand Up @@ -7,7 +7,7 @@ class TwitterActionAgent < Agent
description <<-MD
The Twitter Action Agent is able to retweet or favorite tweets from the events it receives.

#{ twitter_dependencies_missing if dependencies_missing? }
#{twitter_dependencies_missing if dependencies_missing?}

It expects to consume events generated by twitter agents where the payload is a hash of tweet information. The existing TwitterStreamAgent is one example of a valid event producer for this Agent.

Expand Down Expand Up @@ -83,7 +83,10 @@ def receive(incoming_events)

def tweets_from_events(events)
events.map do |e|
Twitter::Tweet.new(id: e.payload["id"], text: e.payload["text"])
Twitter::Tweet.new(
id: e.payload["id"],
text: e.payload["expanded_text"] || e.payload["full_text"] || e.payload["text"]
)
end
end
end
Expand Down
70 changes: 31 additions & 39 deletions app/models/agents/twitter_favorites.rb
Expand Up @@ -2,6 +2,7 @@ module Agents
class TwitterFavorites < Agent
include TwitterConcern

can_dry_run!
cannot_receive_events!

description <<-MD
Expand All @@ -14,31 +15,14 @@ class TwitterFavorites < Agent
You must also provide the `username` of the Twitter user, `number` of latest tweets to monitor and `history' as number of tweets that will be held in memory.

Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.

Set `starting_at` to the date/time (eg. `Mon Jun 02 00:38:12 +0000 2014`) you want to start receiving tweets from (default: agent's `created_at`)
MD

event_description <<-MD
Events are the raw JSON provided by the [Twitter API](https://dev.twitter.com/docs/api/1.1/get/favorites/list). Should look something like:
{
... every Tweet field, including ...
"text": "something",
"user": {
"name": "Mr. Someone",
"screen_name": "Someone",
"location": "Vancouver BC Canada",
"description": "...",
"followers_count": 486,
"friends_count": 1983,
"created_at": "Mon Aug 29 23:38:14 +0000 2011",
"time_zone": "Pacific Time (US & Canada)",
"statuses_count": 3807,
"lang": "en"
},
"retweet_count": 0,
"entities": ...
"lang": "en"
}
event_description <<~MD
Events are the raw JSON provided by the [Twitter API v1.1](https://dev.twitter.com/docs/api/1.1/get/favorites/list) with slight modifications. They should look something like this:

#{tweet_event_description('full_text')}
MD

default_schedule "every_1h"
Expand All @@ -56,36 +40,44 @@ def default_options
}
end

def validate_options
errors.add(:base, "username is required") unless options['username'].present?
errors.add(:base, "number is required") unless options['number'].present?
errors.add(:base, "history is required") unless options['history'].present?
errors.add(:base, "expected_update_period_in_days is required") unless options['expected_update_period_in_days'].present?
def validate_options
errors.add(:base, "username is required") unless options[:username].present?
errors.add(:base, "number is required") unless options[:number].present?
errors.add(:base, "history is required") unless options[:history].present?
errors.add(
:base,
"expected_update_period_in_days is required"
) unless options[:expected_update_period_in_days].present?

if options[:starting_at].present?
Time.parse(options[:starting_at]) rescue errors.add(:base, "Error parsing starting_at")
begin
Time.parse(options[:starting_at])
rescue StandardError
errors.add(:base, "Error parsing starting_at")
end
end
end

def starting_at
if interpolated[:starting_at].present?
Time.parse(interpolated[:starting_at]) rescue created_at
else
created_at
end
begin
Time.parse(interpolated[:starting_at])
rescue StandardError
end
end || created_at || Time.now # for dry-running
end

def check
opts = {:count => interpolated['number'], tweet_mode: 'extended'}
tweets = twitter.favorites(interpolated['username'], opts)
opts = { count: interpolated[:number], tweet_mode: 'extended' }
tweets = twitter.favorites(interpolated[:username], opts)
memory[:last_seen] ||= []

tweets.each do |tweet|
unless memory[:last_seen].include?(tweet.id) || tweet.created_at < starting_at
memory[:last_seen].push(tweet.id)
memory[:last_seen].shift if memory[:last_seen].length > interpolated['history'].to_i
create_event payload: tweet.attrs
end
next if memory[:last_seen].include?(tweet.id) || tweet.created_at < starting_at

memory[:last_seen].push(tweet.id)
memory[:last_seen].shift if memory[:last_seen].length > interpolated[:history].to_i
create_event(payload: format_tweet(tweet))
end
end
end
Expand Down
77 changes: 36 additions & 41 deletions app/models/agents/twitter_search_agent.rb
Expand Up @@ -2,6 +2,7 @@ module Agents
class TwitterSearchAgent < Agent
include TwitterConcern

can_dry_run!
cannot_receive_events!

description <<-MD
Expand All @@ -14,7 +15,7 @@ class TwitterSearchAgent < Agent
To be able to use this Agent you need to authenticate with Twitter in the [Services](/services) section first.

You must provide the desired `search`.

Set `result_type` to specify which [type of search results](https://dev.twitter.com/rest/reference/get/search/tweets) you would prefer to receive. Options are "mixed", "recent", and "popular". (default: `mixed`)

Set `max_results` to limit the amount of results to retrieve per run(default: `500`. The API rate limit is ~18,000 per 15 minutes. [Click here to learn more about rate limits](https://dev.twitter.com/rest/public/rate-limiting).
Expand All @@ -24,34 +25,16 @@ class TwitterSearchAgent < Agent
Set `starting_at` to the date/time (eg. `Mon Jun 02 00:38:12 +0000 2014`) you want to start receiving tweets from (default: agent's `created_at`)
MD

event_description <<-MD
Events are the raw JSON provided by the [Twitter API](https://dev.twitter.com/rest/reference/get/search/tweets). Should look something like:

{
... every Tweet field, including ...
"text": "something",
"user": {
"name": "Mr. Someone",
"screen_name": "Someone",
"location": "Vancouver BC Canada",
"description": "...",
"followers_count": 486,
"friends_count": 1983,
"created_at": "Mon Aug 29 23:38:14 +0000 2011",
"time_zone": "Pacific Time (US & Canada)",
"statuses_count": 3807,
"lang": "en"
},
"retweet_count": 0,
"entities": ...
"lang": "en"
}
event_description <<~MD
Events are the raw JSON provided by the [Twitter API v1.1](https://developer.twitter.com/en/docs/twitter-api/v1/tweets/search/api-reference/get-search-tweets) with slight modifications. They should look something like this:

#{tweet_event_description('full_text')}
MD

default_schedule "every_1h"

def working?
event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
event_created_within?(interpolated[:expected_update_period_in_days]) && !recent_error_logs?
end

def default_options
Expand All @@ -62,41 +45,53 @@ def default_options
end

def validate_options
errors.add(:base, "search is required") unless options['search'].present?
errors.add(:base, "expected_update_period_in_days is required") unless options['expected_update_period_in_days'].present?
if options[:search].blank?
errors.add(:base, "search is required")
end

if options[:expected_update_period_in_days].blank?
errors.add(:base, "expected_update_period_in_days is required")
end

if options[:starting_at].present?
Time.parse(interpolated[:starting_at]) rescue errors.add(:base, "Error parsing starting_at")
begin
Time.parse(interpolated[:starting_at])
rescue StandardError
errors.add(:base, "Error parsing starting_at")
end
end
end

def starting_at
if interpolated[:starting_at].present?
Time.parse(interpolated[:starting_at]) rescue created_at
else
created_at
end
begin
Time.parse(interpolated[:starting_at])
rescue StandardError
end
end || created_at || Time.now # for dry-running
end

def max_results
(interpolated['max_results'].presence || 500).to_i
(interpolated[:max_results].presence || 500).to_i
end

def check
since_id = memory['since_id'] || nil
opts = {include_entities: true, tweet_mode: 'extended'}
opts.merge! result_type: interpolated[:result_type] if interpolated[:result_type].present?
opts.merge! since_id: since_id unless since_id.nil?
opts = {
include_entities: true,
tweet_mode: 'extended',
result_type: interpolated[:result_type].presence,
since_id: memory[:since_id].presence,
}.compact

# http://www.rubydoc.info/gems/twitter/Twitter/REST/Search
tweets = twitter.search(interpolated['search'], opts).take(max_results)
tweets = twitter.search(interpolated[:search], opts).take(max_results)

tweets.each do |tweet|
if (tweet.created_at >= starting_at)
memory['since_id'] = tweet.id if !memory['since_id'] || (tweet.id > memory['since_id'])
next unless tweet.created_at >= starting_at

create_event payload: tweet.attrs
end
memory[:since_id] = [tweet.id, *memory[:since_id]].max

create_event(payload: format_tweet(tweet))
end

save!
Expand Down