-
Notifications
You must be signed in to change notification settings - Fork 0
/
tweets_sentiments.py
231 lines (210 loc) · 8.15 KB
/
tweets_sentiments.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
from mrjob.job import MRJob
from mrjob.step import MRStep
from shapely.geometry import shape
import json
import sys
import re
sys.path.append('.')
WORD_RE = re.compile(r"[\w']+")
_LIM_TRENDS = 10
class MRTweetSentiment(MRJob):
"""
Class with mapper and reducer functions. It extends MRJob, and provides
functions to perform sentiment analysis using tweets and MapReduce based
algorithms.
"""
def configure_args(self):
"""
Function that enables to pass an extra param through the command line.
These values can be:
- sentiments: the execution will return a list of regions with its
computed sentiment score.
- most-happy: the execution will return the region with the higher
sentiment score.
- trending: the execution will return the 10 most popular trending
topics.
"""
super(MRTweetSentiment, self).configure_args()
self.add_passthru_arg(
'--job-options', default='sentiments', choices=['sentiments',
'most-happy',
'trending'],
help="Specify the output of the job")
def steps(self):
"""
Defines execution steps based on --job-options parameter.
:return: list of MRStep.
"""
if self.options.job_options == 'sentiments':
return [
MRStep(mapper=self.mapper_region_sentiments,
combiner=self.combiner_sum_values,
reducer=self.reducer_region_sentiments)]
elif self.options.job_options == 'most-happy':
return [
MRStep(mapper=self.mapper_region_sentiments,
combiner=self.combiner_sum_values,
reducer=self.reducer_sentiments_region),
MRStep(reducer=self.reducer_max_sentiment)]
elif self.options.job_options == 'trending':
return [
MRStep(mapper=self.mapper_trending,
combiner=self.combiner_sum_values,
reducer=self.reducer_trending),
MRStep(reducer=self.reducer_order)]
@staticmethod
def sentiment_dictionary():
"""
Reads a file given through --file option and returns a dictionary
with word-score mapping for a given language.
:return: dictionary of words with their associated sentiment score.
"""
# TODO: find a way to access --file without specifying file name.
score_file = open("Redondo_words.txt")
scores = {} # initialize an empty dictionary
for line in score_file:
term, score = line.split("\t") # The file is tab-delimited.
scores[term] = float(score)
return scores
def compute_sentiment(self, tweet_text):
"""
Given a text, this function maps each word to a score dictionary in
order to compute the associated sentiment.
:param tweet_text: string with the tweet text.
:return: int accumulated score in the text.
"""
sentiment = 0
# Reading the language file
sent_dict = self.sentiment_dictionary()
for word in WORD_RE.findall(tweet_text):
sentiment_value = sent_dict.get(word.lower())
# A word may not be included in the language dictionary
if sentiment_value is not None:
sentiment += sentiment_value
return sentiment
@staticmethod
def box_to_region(bounding_box):
"""
This function receives a dict including coordinates that resemble a
Polygon, and checks whether that Polygon is included in any bigger
polygon (region).
:param bounding_box: dict that includes coordinates of a tweet.
:return: str representing the region where bounding_box is included.
"""
with open("comunidades.json") as regions:
shapes_regions = json.load(regions)
for region, coordinates in shapes_regions.items():
region_shape = shape(coordinates)
if region_shape.contains(shape(bounding_box)):
return region
def mapper_region_sentiments(self, _, line):
"""
Map function. The value is the calculated sentiment
of the tweet and the key is the designated region.
:param line: json corresponding to each tweet.
:return: (key, value) tuple.
"""
# Saving json as a dictionary
tweet = json.loads(line)
place = tweet.get('place')
# Place may not be informed
if place is not None:
country = place.get('country_code')
bounding_box = place.get('bounding_box')
tweet_text = tweet.get('text')
lang = tweet.get('lang')
if (country == 'ES') & (tweet_text is not None) & \
(lang == 'es'):
# region = place.get('name')
region = self.box_to_region(bounding_box)
if region:
sentiment = self.compute_sentiment(tweet_text)
yield (region, sentiment)
@staticmethod
def mapper_trending(_, line):
"""
Map function. The key is a trending topic word and the
value is 1.
:param line: json corresponding to each tweet
:return: (key, value) tuple.
"""
# Saving json as a dictionary
tweet = json.loads(line)
place = tweet.get('place')
# Place may not be informed
if place is not None:
country = place.get('country_code')
tweet_text = tweet.get('text')
lang = tweet.get('lang')
if (country == 'ES') & (tweet_text is not None) & \
(lang == 'es'):
for word in tweet_text.split():
if '#' in word:
yield (word.split('#')[1], 1)
@staticmethod
def combiner_sum_values(key, values):
"""
Combiner function.
:param key: str
:param values: int
:return: (key, value) tuple. Value is the sum of values.
"""
yield (key, sum(values))
@staticmethod
def reducer_region_sentiments(region, sentiments):
"""
Reduce function. It produces a tuple with a region and the sum of the
corresponding sentiments.
:param region: str
:param sentiments: int
:return: (key, value) tuple.
"""
yield (region, sum(sentiments))
@staticmethod
def reducer_sentiments_region(region, sentiments):
"""
Reduce function. For a same key (None) it produces a tuple with the
sum of sentiments for a region, and the region.
:param region: str
:param sentiments: int
:return: None, (key, value) tuple.
"""
yield None, (sum(sentiments), region)
@staticmethod
def reducer_max_sentiment(_, sentiment_region):
"""
Reduce function. It produces a tuple with the region with the highest
sentiment score, and that same score.
:param _:
:param sentiment_region: (key, value) tuple.
:return: (key, value) tuple.
"""
yield max(sentiment_region)
@staticmethod
def reducer_trending(word, counts):
"""
Reduce function. For a same key (None) it produces a tuple with the sum
of occurrences of a word, and the word.
:param word: str
:param counts: int
:return: None, (key, value) tuple.
"""
yield None, (sum(counts), word)
@staticmethod
def reducer_order(_, count_words):
"""
Reduce function. It collects every word with its occurrences, and
returns an ordered list of 10 words.
:param _:
:param count_words: (key, value) tuple
:return: (key, value) tuple
"""
words = 0
for count, trend in sorted(count_words, reverse=True):
if words < _LIM_TRENDS:
words += 1
yield (trend, count)
else:
break
if __name__ == '__main__':
MRTweetSentiment.run()