Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 194 lines (164 sloc) 6.071 kB
755700f @joshthecoder Adding stream object to library. Got stream reading loop working. Add…
joshthecoder authored
1 # Tweepy
2 # Copyright 2009 Joshua Roesslein
3 # See LICENSE
4
5 import httplib
d4f265a @joshthecoder Added timeout to stream connection to prevent deadlock on disconnect()
joshthecoder authored
6 from socket import timeout
755700f @joshthecoder Adding stream object to library. Got stream reading loop working. Add…
joshthecoder authored
7 from threading import Thread
d4f265a @joshthecoder Added timeout to stream connection to prevent deadlock on disconnect()
joshthecoder authored
8 from time import sleep
eb1d8bb @joshthecoder Move parameters into POST body in streaming.py to avoid "head too big…
joshthecoder authored
9 import urllib
755700f @joshthecoder Adding stream object to library. Got stream reading loop working. Add…
joshthecoder authored
10
405e0b8 @joshthecoder Change to absolute imports.
joshthecoder authored
11 from tweepy.auth import BasicAuthHandler
12 from tweepy.parsers import parse_status
13 from tweepy.api import API
14 from tweepy.error import TweepError
755700f @joshthecoder Adding stream object to library. Got stream reading loop working. Add…
joshthecoder authored
15
16 try:
12e03dc @gumptionthomas Modified another json import to work on app engine
gumptionthomas authored
17 import json #Python >= 2.6
755700f @joshthecoder Adding stream object to library. Got stream reading loop working. Add…
joshthecoder authored
18 except ImportError:
12e03dc @gumptionthomas Modified another json import to work on app engine
gumptionthomas authored
19 try:
20 import simplejson as json #Python < 2.6
21 except ImportError:
22 try:
23 from django.utils import simplejson as json #Google App Engine
24 except ImportError:
25 raise ImportError, "Can't load a json library"
755700f @joshthecoder Adding stream object to library. Got stream reading loop working. Add…
joshthecoder authored
26
dcbd075 @joshthecoder Update stream API to use new API method URLs. Added StreamListener to…
joshthecoder authored
27 STREAM_VERSION = 1
28
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
29
dcbd075 @joshthecoder Update stream API to use new API method URLs. Added StreamListener to…
joshthecoder authored
30 class StreamListener(object):
31
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
32 def on_status(self, status):
33 """Called when a new status arrives"""
34 return
35
36 def on_delete(self, status_id, user_id):
37 """Called when a delete notice arrives for a status"""
38 return
dcbd075 @joshthecoder Update stream API to use new API method URLs. Added StreamListener to…
joshthecoder authored
39
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
40 def on_limit(self, track):
41 """Called when a limitation notice arrvies"""
42 return
dcbd075 @joshthecoder Update stream API to use new API method URLs. Added StreamListener to…
joshthecoder authored
43
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
44 def on_error(self, status_code):
45 """Called when a non-200 status code is returned"""
46 return False
dcbd075 @joshthecoder Update stream API to use new API method URLs. Added StreamListener to…
joshthecoder authored
47
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
48 def on_timeout(self):
49 """Called when stream connection times out"""
50 return
dcbd075 @joshthecoder Update stream API to use new API method URLs. Added StreamListener to…
joshthecoder authored
51
ecdc8b5 @joshthecoder Added on_timeout() to stream listener.
joshthecoder authored
52
755700f @joshthecoder Adding stream object to library. Got stream reading loop working. Add…
joshthecoder authored
53 class Stream(object):
54
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
55 host = 'stream.twitter.com'
56
57 def __init__(self, username, password, listener, timeout=5.0, retry_count = None,
58 retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
59 self.auth = BasicAuthHandler(username, password)
60 self.running = False
61 self.timeout = timeout
62 self.retry_count = retry_count
63 self.retry_time = retry_time
64 self.snooze_time = snooze_time
65 self.buffer_size = buffer_size
66 self.listener = listener
67 self.api = API()
eb1d8bb @joshthecoder Move parameters into POST body in streaming.py to avoid "head too big…
joshthecoder authored
68 self.headers = {}
69 self.body = None
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
70
71 def _run(self):
72 # setup
eb1d8bb @joshthecoder Move parameters into POST body in streaming.py to avoid "head too big…
joshthecoder authored
73 self.auth.apply_auth(None, None, self.headers, None)
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
74
75 # enter loop
76 error_counter = 0
77 conn = None
78 while self.running:
79 if self.retry_count and error_counter > self.retry_count:
80 # quit if error count greater than retry count
81 break
82 try:
83 conn = httplib.HTTPConnection(self.host)
84 conn.connect()
85 conn.sock.settimeout(self.timeout)
eb1d8bb @joshthecoder Move parameters into POST body in streaming.py to avoid "head too big…
joshthecoder authored
86 conn.request('POST', self.url, self.body, headers=self.headers)
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
87 resp = conn.getresponse()
88 if resp.status != 200:
89 if self.listener.on_error(resp.status) is False:
90 break
91 error_counter += 1
92 sleep(self.retry_time)
93 else:
94 error_counter = 0
95 self._read_loop(resp)
96 except timeout:
97 if self.listener.on_timeout() == False:
98 break
99 if self.running is False:
100 break
101 conn.close()
102 sleep(self.snooze_time)
103 except Exception:
104 # any other exception is fatal, so kill loop
105 break
106
107 # cleanup
108 self.running = False
109 if conn:
110 conn.close()
111
112 def _read_loop(self, resp):
113 data = ''
114 while self.running:
115 if resp.isclosed():
116 break
117
118 # read length
119 length = ''
120 while True:
121 c = resp.read(1)
122 if c == '\n':
123 break
124 length += c
125 length = length.strip()
126 if length.isdigit():
127 length = int(length)
128 else:
129 continue
130
131 # read data
132 data = resp.read(length)
133
134 # turn json data into status object
135 if 'in_reply_to_status_id' in data:
a5c206d @joshthecoder Fix streaming API regression. Thanks Pascal Jurgens!
joshthecoder authored
136 status = parse_status(json.loads(data), self.api)
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
137 if self.listener.on_status(status) == False:
138 self.running = False
139 elif 'delete' in data:
140 delete = json.loads(data)['delete']['status']
141 if self.listener.on_delete(delete['id'], delete['user_id']) == False:
142 self.running = False
143 elif 'limit' in data:
144 if self.listener.on_limit(json.loads(data)['limit']['track']) == False:
145 self.running = False
146
0b530bf @joshthecoder The streaming API Stream object can be run either in async/synch mode…
joshthecoder authored
147 def _start(self, async):
148 self.running = True
149 if async:
150 Thread(target=self._run).start()
151 else:
152 self._run()
153
154 def firehose(self, count=None, async=False):
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
155 if self.running:
156 raise TweepError('Stream object already connected!')
157 self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
158 if count:
159 self.url += '&count=%s' % count
0b530bf @joshthecoder The streaming API Stream object can be run either in async/synch mode…
joshthecoder authored
160 self._start(async)
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
161
0b530bf @joshthecoder The streaming API Stream object can be run either in async/synch mode…
joshthecoder authored
162 def retweet(self, async=False):
914d9d5 @joshthecoder Added retweet streaming method.
joshthecoder authored
163 if self.running:
164 raise TweepError('Stream object already connected!')
165 self.url = '/%i/statuses/retweet.json?delimited=length' % STREAM_VERSION
0b530bf @joshthecoder The streaming API Stream object can be run either in async/synch mode…
joshthecoder authored
166 self._start(async)
914d9d5 @joshthecoder Added retweet streaming method.
joshthecoder authored
167
0b530bf @joshthecoder The streaming API Stream object can be run either in async/synch mode…
joshthecoder authored
168 def sample(self, count=None, async=False):
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
169 if self.running:
170 raise TweepError('Stream object already connected!')
171 self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
172 if count:
173 self.url += '&count=%s' % count
0b530bf @joshthecoder The streaming API Stream object can be run either in async/synch mode…
joshthecoder authored
174 self._start(async)
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
175
0b530bf @joshthecoder The streaming API Stream object can be run either in async/synch mode…
joshthecoder authored
176 def filter(self, follow=None, track=None, async=False):
eb1d8bb @joshthecoder Move parameters into POST body in streaming.py to avoid "head too big…
joshthecoder authored
177 params = {}
178 self.headers['Content-type'] = "application/x-www-form-urlencoded"
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
179 if self.running:
180 raise TweepError('Stream object already connected!')
181 self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
182 if follow:
eb1d8bb @joshthecoder Move parameters into POST body in streaming.py to avoid "head too big…
joshthecoder authored
183 params['follow'] = ','.join(map(str, follow))
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
184 if track:
eb1d8bb @joshthecoder Move parameters into POST body in streaming.py to avoid "head too big…
joshthecoder authored
185 params['track'] = ','.join(map(str, track))
186 self.body = urllib.urlencode(params)
0b530bf @joshthecoder The streaming API Stream object can be run either in async/synch mode…
joshthecoder authored
187 self._start(async)
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
188
189 def disconnect(self):
f266256 @joshthecoder Added better error handling and idle connection timeouts.
joshthecoder authored
190 if self.running is False:
a00f378 @joshthecoder 2 space -> 4 space indents
joshthecoder authored
191 return
192 self.running = False
193
Something went wrong with that request. Please try again.