-
-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
twitter_stream_agent.rb
128 lines (107 loc) · 4.34 KB
/
twitter_stream_agent.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
module Agents
class TwitterStreamAgent < Agent
include TwitterConcern
cannot_receive_events!
description <<-MD
The TwitterStreamAgent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide.
To follow the Twitter stream, provide an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order.
If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases.
Twitter credentials must be supplied as either [credentials](/user_credentials) called
`twitter_consumer_key`, `twitter_consumer_secret`, `twitter_oauth_token`, and `twitter_oauth_token_secret`,
or as options to this Agent called `consumer_key`, `consumer_secret`, `oauth_token`, and `oauth_token_secret`.
To get oAuth credentials for Twitter, [follow these instructions](https://github.com/cantino/huginn/wiki/Getting-a-twitter-oauth-token).
Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
`generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled.
MD
event_description <<-MD
When in `counts` mode, TwitterStreamAgent events look like:
{
"filter": "hello world",
"count": 25,
"time": 3456785456
}
When in `events` mode, TwitterStreamAgent events look like:
{
"filter": "selectorgadget",
... every Tweet field, including ...
"text": "something",
"user": {
"name": "Mr. Someone",
"screen_name": "Someone",
"location": "Vancouver BC Canada",
"description": "...",
"followers_count": 486,
"friends_count": 1983,
"created_at": "Mon Aug 29 23:38:14 +0000 2011",
"time_zone": "Pacific Time (US & Canada)",
"statuses_count": 3807,
"lang": "en"
},
"retweet_count": 0,
"entities": ...
"lang": "en"
}
MD
default_schedule "11pm"
def validate_options
unless options['filters'].present? &&
options['expected_update_period_in_days'].present? &&
options['generate'].present?
errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields")
end
end
def working?
event_created_within?(options['expected_update_period_in_days']) && !recent_error_logs?
end
def default_options
{
'filters' => %w[keyword1 keyword2],
'expected_update_period_in_days' => "2",
'generate' => "events"
}
end
def process_tweet(filter, status)
filter = lookup_filter(filter)
if filter
if options['generate'] == "counts"
# Avoid memory pollution by reloading the Agent.
agent = Agent.find(id)
agent.memory['filter_counts'] ||= {}
agent.memory['filter_counts'][filter] ||= 0
agent.memory['filter_counts'][filter] += 1
remove_unused_keys!(agent, 'filter_counts')
agent.save!
else
create_event :payload => status.merge('filter' => filter)
end
end
end
def check
if options['generate'] == "counts" && memory['filter_counts'] && memory['filter_counts'].length > 0
memory['filter_counts'].each do |filter, count|
create_event :payload => { 'filter' => filter, 'count' => count, 'time' => Time.now.to_i }
end
end
memory['filter_counts'] = {}
end
protected
def lookup_filter(filter)
options['filters'].each do |known_filter|
if known_filter == filter
return filter
elsif known_filter.is_a?(Array)
if known_filter.include?(filter)
return known_filter.first
end
end
end
end
def remove_unused_keys!(agent, base)
if agent.memory[base]
(agent.memory[base].keys - agent.options['filters'].map {|f| f.is_a?(Array) ? f.first.to_s : f.to_s }).each do |removed_key|
agent.memory[base].delete(removed_key)
end
end
end
end
end