-
Notifications
You must be signed in to change notification settings - Fork 4
/
twitternet.py
162 lines (143 loc) · 6.48 KB
/
twitternet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import logging, socket, sys, re, numpy, pickle
import TwistedTwitterStream
from backend.snoc import SocNOC
from vectornet.utils import ProducingNode, BasicNode
from secrets import TWITTER_USER, TWITTER_PASSWORD
rootLogger = logging.getLogger('')
rootLogger.setLevel(25)
logger = logging.getLogger(socket.gethostname())
#logger.addHandler(logging.FileHandler('log'))
handler = None
def log(obj, msg):
global handler
if handler is None:
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.log(25, '\033[1;31m%s\033[0m: %s' % (obj.node['name'], msg))
def make_send(sender, keys=True):
def _send(data):
log(sender, data.keys() if keys else data)
sender.sendMessage(data)
return _send
class TwitterStream(ProducingNode):
def startProducing(self):
send = make_send(self)
class Consumer(TwistedTwitterStream.TweetReceiver):
def connectionFailed(this, why):
log(self, 'connection failed (%s)' % why)
def tweetReceived(this, data):
if 'delete' not in data:
send(data)
TwistedTwitterStream.sample(TWITTER_USER, TWITTER_PASSWORD, Consumer())
class SpecificStream(ProducingNode):
def startProducing(self):
send = make_send(self)
regex = re.compile('|'.join([x.lower() for x in self.node['_topics']]), re.I)
track = [ x.split(None, 1)[0] for x in self.node['_topics'] ]
class Consumer(TwistedTwitterStream.TweetReceiver):
def connectionFailed(this, why):
log(self, 'connection failed (%s)' % why)
def tweetReceived(this, data):
if 'delete' not in data and data['user'] and regex.search(data['text']):
send(data)
TwistedTwitterStream.filter(TWITTER_USER, TWITTER_PASSWORD, Consumer(), track=track)
class BlogStream(ProducingNode):
def __init__(self, router, nodeDict):
ProducingNode.__init__(self, router, nodeDict)
from backend.utils import weave_streams, make_tuples
self.feeds = weave_streams(make_tuples(*pair) for pair in self.node['_blogs'])
def startProducing(self):
from twisted.internet import reactor
from twisted.web import client
from random import random
import feedparser
send = make_send(self)
def read(feed, url, tag):
try:
item = next(feed)
except StopIteration:
reactor.callLater(random(), get, url, tag)
else:
post = SocNOC.process_feed_item(item)
if isinstance(post, basestring):
send(dict(post=post, word=tag))
reactor.callLater(random(), read, feed, url, tag)
def get(url, tag):
( client.getPage(url)
.addCallback(feedparser.parse)
.addCallback(lambda feed: iter(feed['items']))
.addCallback(read, url, tag)
.addErrback(lambda err: get(url, tag)) )
for feed in self.feeds:
get(*feed)
class TwitterProcess(BasicNode):
def __init__(self, router, nodeDict):
BasicNode.__init__(self, router, nodeDict)
self.snoc = SocNOC()
self.snoc.send = make_send(self)
if '_categories' in self.node:
with open('backend/affect.pickle') as affect_file:
self.snoc.categories = dict(self.node['_categories'], affect=pickle.load(affect_file))
def compute(self, data):
for tweet in data.values():
self.snoc.receive_tweet(tweet)
class BlogProcess(BasicNode):
def __init__(self, router, nodeDict):
BasicNode.__init__(self, router, nodeDict)
self.snoc = SocNOC(k=10, spicefreq=0, cnetfreq=0)
self.snoc.send = make_send(self)
if '_categories' in self.node:
with open('backend/affect.pickle') as affect_file:
self.snoc.categories = dict(self.node['_categories'], affect=pickle.load(affect_file))
def compute(self, data):
for datum in data.values():
self.snoc.process_post(**datum)
class TwitterSom(BasicNode):
def __init__(self, router, nodeDict):
BasicNode.__init__(self, router, nodeDict)
from backend.som import SOMBuilder
self.som = SOMBuilder(k=20, map_size=tuple(self.node['_somsize']))
self.som.send = make_send(self)
def compute(self, data):
for tweet in data.values():
self.som.on_message(tweet)
class RfbfSom(BasicNode):
def __init__(self, router, nodeDict):
BasicNode.__init__(self, router, nodeDict)
from backend.somfish import SOMFish
self.som = SOMFish(self.node['_fixed'], k=10, map_size=tuple(self.node['_somsize']))
self.som.send = make_send(self)
def compute(self, data):
for tweet in data.values():
self.som.on_message(tweet)
class RfbfVec(BasicNode):
def __init__(self, router, nodeDict):
BasicNode.__init__(self, router, nodeDict)
self.send = make_send(self)
@staticmethod
def orthogonalize(vec1, vec2):
return vec2 - vec1 * numpy.vdot(vec1, vec2) / numpy.vdot(vec1, vec1)
@staticmethod
def unpack(dct):
ret = {}
for k, v in dct.items():
ret[k] = numpy.array(v)[1:]
return ret
def compute(self, data):
for datum in data.values():
text = datum.get('text', None)
if text and text[0] != '(':
categories, concepts = self.unpack(datum['categories']), self.unpack(datum['concepts'])
if not all( vec.any() for vec in categories.values() ):
continue # ignore if any are zero vectors
politics = self.orthogonalize(categories['person'], categories['politics'])
affect = self.orthogonalize(politics, categories['affect'])
pnorm, anorm = numpy.linalg.norm(politics), numpy.linalg.norm(affect)
concepts.pop('empty', None) # ignore 'empty' concept if it exists
for con, vec in concepts.items():
vnorm = numpy.linalg.norm(vec)
self.send({ 'concept' : con,
'text' : text,
'size' : float(numpy.sqrt(numpy.sqrt(vnorm))),
'x' : float(numpy.vdot(politics, vec) / pnorm / vnorm),
'y' : float(numpy.vdot(affect, vec) / anorm / vnorm) })