-
Notifications
You must be signed in to change notification settings - Fork 921
/
streams.go
323 lines (299 loc) · 10.3 KB
/
streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
package twitter
import (
"encoding/json"
"io"
"net/http"
"sync"
"time"
"github.com/cenkalti/backoff"
"github.com/dghubble/sling"
)
const (
userAgent = "go-twitter v0.1"
publicStream = "https://stream.twitter.com/1.1/"
userStream = "https://userstream.twitter.com/1.1/"
siteStream = "https://sitestream.twitter.com/1.1/"
)
// StreamService provides methods for accessing the Twitter Streaming API.
type StreamService struct {
client *http.Client
public *sling.Sling
user *sling.Sling
site *sling.Sling
}
// newStreamService returns a new StreamService.
func newStreamService(client *http.Client, sling *sling.Sling) *StreamService {
sling.Set("User-Agent", userAgent)
return &StreamService{
client: client,
public: sling.New().Base(publicStream).Path("statuses/"),
user: sling.New().Base(userStream),
site: sling.New().Base(siteStream),
}
}
// StreamFilterParams are parameters for StreamService.Filter.
type StreamFilterParams struct {
FilterLevel string `url:"filter_level,omitempty"`
Follow []string `url:"follow,omitempty,comma"`
Language []string `url:"language,omitempty,comma"`
Locations []string `url:"locations,omitempty,comma"`
StallWarnings *bool `url:"stall_warnings,omitempty"`
Track []string `url:"track,omitempty,comma"`
}
// Filter returns messages that match one or more filter predicates.
// https://dev.twitter.com/streaming/reference/post/statuses/filter
func (srv *StreamService) Filter(params *StreamFilterParams) (*Stream, error) {
req, err := srv.public.New().Post("filter.json").BodyForm(params).Request()
if err != nil {
return nil, err
}
return newStream(srv.client, req), nil
}
// StreamSampleParams are the parameters for StreamService.Sample.
type StreamSampleParams struct {
StallWarnings *bool `url:"stall_warnings,omitempty"`
Language []string `url:"language,omitempty,comma"`
}
// Sample returns a small sample of public stream messages.
// https://dev.twitter.com/streaming/reference/get/statuses/sample
func (srv *StreamService) Sample(params *StreamSampleParams) (*Stream, error) {
req, err := srv.public.New().Get("sample.json").QueryStruct(params).Request()
if err != nil {
return nil, err
}
return newStream(srv.client, req), nil
}
// StreamUserParams are the parameters for StreamService.User.
type StreamUserParams struct {
FilterLevel string `url:"filter_level,omitempty"`
Language []string `url:"language,omitempty,comma"`
Locations []string `url:"locations,omitempty,comma"`
Replies string `url:"replies,omitempty"`
StallWarnings *bool `url:"stall_warnings,omitempty"`
Track []string `url:"track,omitempty,comma"`
With string `url:"with,omitempty"`
}
// User returns a stream of messages specific to the authenticated User.
// https://dev.twitter.com/streaming/reference/get/user
func (srv *StreamService) User(params *StreamUserParams) (*Stream, error) {
req, err := srv.user.New().Get("user.json").QueryStruct(params).Request()
if err != nil {
return nil, err
}
return newStream(srv.client, req), nil
}
// StreamSiteParams are the parameters for StreamService.Site.
type StreamSiteParams struct {
FilterLevel string `url:"filter_level,omitempty"`
Follow []string `url:"follow,omitempty,comma"`
Language []string `url:"language,omitempty,comma"`
Replies string `url:"replies,omitempty"`
StallWarnings *bool `url:"stall_warnings,omitempty"`
With string `url:"with,omitempty"`
}
// Site returns messages for a set of users.
// Requires special permission to access.
// https://dev.twitter.com/streaming/reference/get/site
func (srv *StreamService) Site(params *StreamSiteParams) (*Stream, error) {
req, err := srv.site.New().Get("site.json").QueryStruct(params).Request()
if err != nil {
return nil, err
}
return newStream(srv.client, req), nil
}
// StreamFirehoseParams are the parameters for StreamService.Firehose.
type StreamFirehoseParams struct {
Count int `url:"count,omitempty"`
FilterLevel string `url:"filter_level,omitempty"`
Language []string `url:"language,omitempty,comma"`
StallWarnings *bool `url:"stall_warnings,omitempty"`
}
// Firehose returns all public messages and statuses.
// Requires special permission to access.
// https://dev.twitter.com/streaming/reference/get/statuses/firehose
func (srv *StreamService) Firehose(params *StreamFirehoseParams) (*Stream, error) {
req, err := srv.public.New().Get("firehose.json").QueryStruct(params).Request()
if err != nil {
return nil, err
}
return newStream(srv.client, req), nil
}
// Stream maintains a connection to the Twitter Streaming API, receives
// messages from the streaming response, and sends them on the Messages
// channel from a goroutine. The stream goroutine stops itself if an EOF is
// reached or retry errors occur, also closing the Messages channel.
//
// The client must Stop() the stream when finished receiving, which will
// wait until the stream is properly stopped.
type Stream struct {
client *http.Client
Messages chan interface{}
done chan struct{}
group *sync.WaitGroup
body io.Closer
}
// newStream creates a Stream and starts a goroutine to retry connecting and
// receive from a stream response. The goroutine may stop due to retry errors
// or be stopped by calling Stop() on the stream.
func newStream(client *http.Client, req *http.Request) *Stream {
s := &Stream{
client: client,
Messages: make(chan interface{}),
done: make(chan struct{}),
group: &sync.WaitGroup{},
}
s.group.Add(1)
go s.retry(req, newExponentialBackOff(), newAggressiveExponentialBackOff())
return s
}
// Stop signals retry and receiver to stop, closes the Messages channel, and
// blocks until done.
func (s *Stream) Stop() {
close(s.done)
// Scanner does not have a Stop() or take a done channel, so for low volume
// streams Scan() blocks until the next keep-alive. Close the resp.Body to
// escape and stop the stream in a timely fashion.
if s.body != nil {
s.body.Close()
}
// block until the retry goroutine stops
s.group.Wait()
}
// retry retries making the given http.Request and receiving the response
// according to the Twitter backoff policies. Callers should invoke in a
// goroutine since backoffs sleep between retries.
// https://dev.twitter.com/streaming/overview/connecting
func (s *Stream) retry(req *http.Request, expBackOff backoff.BackOff, aggExpBackOff backoff.BackOff) {
// close Messages channel and decrement the wait group counter
defer close(s.Messages)
defer s.group.Done()
var wait time.Duration
for !stopped(s.done) {
resp, err := s.client.Do(req)
if err != nil {
// stop retrying for HTTP protocol errors
s.Messages <- err
return
}
// when err is nil, resp contains a non-nil Body which must be closed
defer resp.Body.Close()
s.body = resp.Body
switch resp.StatusCode {
case 200:
// receive stream response Body, handles closing
s.receive(resp.Body)
expBackOff.Reset()
aggExpBackOff.Reset()
case 503:
// exponential backoff
wait = expBackOff.NextBackOff()
case 420, 429:
// aggressive exponential backoff
wait = aggExpBackOff.NextBackOff()
default:
// stop retrying for other response codes
resp.Body.Close()
return
}
// close response before each retry
resp.Body.Close()
if wait == backoff.Stop {
return
}
sleepOrDone(wait, s.done)
}
}
// receive scans a stream response body, JSON decodes tokens to messages, and
// sends messages to the Messages channel. Receiving continues until an EOF,
// scan error, or the done channel is closed.
func (s *Stream) receive(body io.Reader) {
reader := newStreamResponseBodyReader(body)
for !stopped(s.done) {
data, err := reader.readNext()
if err != nil {
return
}
if len(data) == 0 {
// empty keep-alive
continue
}
select {
// send messages, data, or errors
case s.Messages <- getMessage(data):
continue
// allow client to Stop(), even if not receiving
case <-s.done:
return
}
}
}
// getMessage unmarshals the token and returns a message struct, if the type
// can be determined. Otherwise, returns the token unmarshalled into a data
// map[string]interface{} or the unmarshal error.
func getMessage(token []byte) interface{} {
var data map[string]interface{}
// unmarshal JSON encoded token into a map for
err := json.Unmarshal(token, &data)
if err != nil {
return err
}
return decodeMessage(token, data)
}
// decodeMessage determines the message type from known data keys, allocates
// at most one message struct, and JSON decodes the token into the message.
// Returns the message struct or the data map if the message type could not be
// determined.
func decodeMessage(token []byte, data map[string]interface{}) interface{} {
if hasPath(data, "retweet_count") {
tweet := new(Tweet)
json.Unmarshal(token, tweet)
return tweet
} else if hasPath(data, "direct_message") {
notice := new(directMessageNotice)
json.Unmarshal(token, notice)
return notice.DirectMessage
} else if hasPath(data, "delete") {
notice := new(statusDeletionNotice)
json.Unmarshal(token, notice)
return notice.Delete.StatusDeletion
} else if hasPath(data, "scrub_geo") {
notice := new(locationDeletionNotice)
json.Unmarshal(token, notice)
return notice.ScrubGeo
} else if hasPath(data, "limit") {
notice := new(streamLimitNotice)
json.Unmarshal(token, notice)
return notice.Limit
} else if hasPath(data, "status_withheld") {
notice := new(statusWithheldNotice)
json.Unmarshal(token, notice)
return notice.StatusWithheld
} else if hasPath(data, "user_withheld") {
notice := new(userWithheldNotice)
json.Unmarshal(token, notice)
return notice.UserWithheld
} else if hasPath(data, "disconnect") {
notice := new(streamDisconnectNotice)
json.Unmarshal(token, notice)
return notice.StreamDisconnect
} else if hasPath(data, "warning") {
notice := new(stallWarningNotice)
json.Unmarshal(token, notice)
return notice.StallWarning
} else if hasPath(data, "friends") {
friendsList := new(FriendsList)
json.Unmarshal(token, friendsList)
return friendsList
} else if hasPath(data, "event") {
event := new(Event)
json.Unmarshal(token, event)
return event
}
// message type unknown, return the data map[string]interface{}
return data
}
// hasPath returns true if the map contains the given key, false otherwise.
func hasPath(data map[string]interface{}, key string) bool {
_, ok := data[key]
return ok
}