-
Notifications
You must be signed in to change notification settings - Fork 0
/
tweet_producer.py
79 lines (58 loc) · 2.2 KB
/
tweet_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from kafka import KafkaProducer
import json
import time
import tweepy
ckey = ""
csecret = ""
atoken = ""
asecret = ""
# Twitter Authentication and Initialization
twitter_auth = tweepy.OAuthHandler(ckey, csecret)
twitter_auth.set_access_token(atoken, asecret)
twitter_api = tweepy.API(twitter_auth, wait_on_rate_limit_notify=True, retry_count=3, retry_delay=5)
print("######### Twitter Authentication Success ######")
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
# Twitter Stream Listener Implementation
class StreamListener(tweepy.StreamListener):
count = 0
def on_data(self, data):
try:
tweet = json.loads(data)
if tweet['coordinates'] is not None:
tweetText = tweet["text"]
user_name = tweet["user"]["name"]
screen_name = tweet["user"]["screen_name"]
coordinates = tweet["coordinates"]["coordinates"]
tweet_struct = {
"coordinates": [coordinates[1], coordinates[0]],
"username": user_name,
"screenname": screen_name,
"text": tweetText,
"sentiment": "",
"id": tweet["id"],
}
print(tweet_struct)
# Slow down the stream by sleeping for 10 seconds after every 10 tweets received
self.count += 1
if self.count == 10:
self.count = 0
print("Sleeping")
time.sleep(10)
producer.send('tweet', json.dumps(tweet_struct))
except (KeyError, UnicodeDecodeError, Exception) as e:
pass
def on_error(self, status_code):
if status_code == 420:
print("Rate Limit")
return True
def main():
print("Twitter Stream Begin!!")
stream_listener = StreamListener()
while True:
try:
streamer = tweepy.Stream(twitter_api.auth, listener=stream_listener)
streamer.filter(track=['trump'],locations=[-180, -90, 180, 90], languages=['en'])
except Exception as e:
print(e)
if __name__ == '__main__':
main()