# Massive Data Processing







## Spark practice

<b>Student: David Sánchez</b>

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

### 1.-Practice: Trending Topics & analysis sentiment (5.0%)


#### Trending Topics

In [2]:
import os, shutil
import json
import unicodedata
from operator import add

if os.path.exists("Results/Tweets_ES.json"): 
    shutil.rmtree("Results/Tweets_ES.json")

input = sc.textFile("Datasets/Tweets/tweets_es.json")
tweets = input.map(lambda x: json.loads(x))
print "Total tweets: ", tweets.count()

tweets_es = tweets.filter(lambda t: "es" in t["lang"])
print "Spanish tweets: ", tweets_es.count()

tweets_es_hashtags = tweets_es.filter(lambda t: t["entities"]["hashtags"] != [] )
print  "Tweets with hashtags: ", tweets_es_hashtags.count()

hashtags = tweets_es_hashtags.flatMap(lambda t: map(lambda h: (unicodedata.normalize('NFKD', h["text"]).encode('ascii','ignore'),1), t["entities"]["hashtags"]))
print "Hashtags: ", hashtags.count()

trending_hashtags = hashtags.reduceByKey(lambda a, b: a + b)
print "Hashtags reduced: ", trending_hashtags.count()

if os.path.exists("results/trending"): 
    shutil.rmtree("results/trending")
trending_hashtags.saveAsTextFile('results/trending')
print "Files saved: 'results/trending'"
print trending_hashtags.take(10)


Total tweets:  19166
Spanish tweets:  15028
Tweets with hashtags:  3495
Hashtags:  5286
Hashtags reduced:  2947
[('VineDeLaAbuela', 1), ('', 1), ('PaisajeCulturalCafetero', 1), ('RMUCL', 11), ('PorSiNoLoViste', 2), ('industria40', 1), ('Mazatepec', 1), ('wallapop', 1), ('29A', 1), ('SomosServidoresPublicos', 2)]


#### Top N

In [7]:
trending_sorted = trending_hashtags.takeOrdered(10, key=lambda t: -t[1])
print trending_sorted

if os.path.exists("results/topn"): 
    shutil.rmtree("results/topn")
sc.parallelize(trending_sorted).saveAsTextFile('results/topn')
print "Files saved: 'results/topn'"

[('MTVMiaw', 196), ('Vota2ParaQueSigaPresidente', 117), ('FelizMartes', 84), ('TuitUtil', 65), ('LibertadDePrensa', 64), ('TeCaesTeLevantasFelices27Mica', 51), ('MTVSnapMexDanna', 39), ('MTVPopMBautista', 37), ('MtvIconoMBautista', 34), ('DebateReal', 34)]
Files saved: 'results/topn'


#### Sentiment

In [142]:
tweets_temp = tweets_es_hashtags.map(lambda t: (t["text"],t["id"],t["entities"]["hashtags"]))
print tweets_temp.take(1)

[(u'@DisneySpain @TiniStoessel LIBERA LOGOOO \n#TINI', 727497968731959296, [{u'indices': [42, 47], u'text': u'TINI'}])]


In [21]:
# Only tweets text
tweets_text = tweets_es_hashtags.map(lambda t: unicodedata.normalize('NFKD',t["text"]).encode('ascii','ignore').lower())
print tweets_text.take(2)

['@disneyspain @tinistoessel libera logooo \n#tini', 'rt @amctv_es: el grupo esta separado... conseguiran escapar? llegaran a baja? el proximo lunes nuevo episodio! #feartwd\nhttps://t.co/vqw...']


In [25]:
# Number of words
tweets_words_count = tweets_es_hashtags.map(lambda t: (t["text"], t["text"].split(" "))).mapValues(lambda x: len(x))
print tweets_words.take(2)

[('@disneyspain @tinistoessel libera logooo \n#tini', 5), ('rt @amctv_es: el grupo esta separado... conseguiran escapar? llegaran a baja? el proximo lunes nuevo episodio! #feartwd\nhttps://t.co/vqw...', 17)]


In [26]:
tweets_words_count2 = tweets_text.map(lambda t: (t, t.split(" "))).mapValues(lambda x: len(x))
print tweets_words.take(2)

[('@disneyspain @tinistoessel libera logooo \n#tini', 5), ('rt @amctv_es: el grupo esta separado... conseguiran escapar? llegaran a baja? el proximo lunes nuevo episodio! #feartwd\nhttps://t.co/vqw...', 17)]


In [107]:
# Hashtags
tweets_hashtag = tweets_es_hashtags.map(lambda t: (t["text"], map(lambda h: h["text"], t["entities"]["hashtags"])))\
                    .flatMapValues(lambda x: x)
print tweets_hashtag.take(2)

[(u'@DisneySpain @TiniStoessel LIBERA LOGOOO \n#TINI', u'TINI'), (u'RT @amctv_es: El grupo est\xe1 separado... \xbfconseguir\xe1n escapar? \xbfllegar\xe1n a Baja? \xa1El pr\xf3ximo lunes nuevo episodio! #FearTWD\nhttps://t.co/vQW\u2026', u'FearTWD')]


In [None]:
# Positive words

In [90]:
file_positive = sc.textFile("Dictionary/positive_words_es.txt")
file_negative = sc.textFile("Dictionary/negative_words_es.txt")
positive_words = file_positive.map(lambda w: w.encode('ascii', 'ignore'))
negative_words = file_negative.map(lambda w: w.encode('ascii', 'ignore'))
positive_words_list = positive_words.collect()
negative_words_list = negative_words.collect()

print "Positive: ", positive_words.take(10), "..."
print "Negative: ", negative_words.take(10), "..."

Positive:  ['libera', 'como', 'gran', 'mayor', 'nuevo', 'general', 'obra', 'principal', 'bien', 'poco'] ...
Negative:  ['divisas', 'en', 'para', 'sin', 'tiempo', 'bajo', 'varios', 'tipo', 'largo', 'solo'] ...


In [85]:
tweets_words = tweets_text.map(lambda t: (t, t.split(" "))).flatMapValues(lambda x: x)
print tweets_words.take(6)

[('@disneyspain @tinistoessel libera logooo \n#tini', '@disneyspain'), ('@disneyspain @tinistoessel libera logooo \n#tini', '@tinistoessel'), ('@disneyspain @tinistoessel libera logooo \n#tini', 'libera'), ('@disneyspain @tinistoessel libera logooo \n#tini', 'logooo'), ('@disneyspain @tinistoessel libera logooo \n#tini', '\n#tini'), ('rt @amctv_es: el grupo esta separado... conseguiran escapar? llegaran a baja? el proximo lunes nuevo episodio! #feartwd\nhttps://t.co/vqw...', 'rt')]


In [86]:
tweets_positive = tweets_words.map(lambda t: (t[0], 1 if t[1] in positive_words_list else 0))
print tweets_positive.take(6)

[('@disneyspain @tinistoessel libera logooo \n#tini', 0), ('@disneyspain @tinistoessel libera logooo \n#tini', 0), ('@disneyspain @tinistoessel libera logooo \n#tini', 1), ('@disneyspain @tinistoessel libera logooo \n#tini', 0), ('@disneyspain @tinistoessel libera logooo \n#tini', 0), ('rt @amctv_es: el grupo esta separado... conseguiran escapar? llegaran a baja? el proximo lunes nuevo episodio! #feartwd\nhttps://t.co/vqw...', 0)]


In [87]:
tweets_positive_count = tweets_positive.reduceByKey(add)
print tweets_positive_count.take(5)

[('hasta nunca #adiosradiomarcaholaverdad', 0), ('rt @fornerinojl: el estado venezolano le asigna a empresas polar las divisas y lorenzo #mendozasabotealapatria .@nicolasmaduro https://t.co...', 0), ('sigue las redes sociales de #micausamimega y preparate porque pronto te sorprenderas https://t.co/1hemf4ujwj', 1), ('rt @fnaraujor: asi recibe cartagena hoy a santos con pancartas y rechiflas #santoscolombianotequiere https://t.co/il9vcb8ffo', 0), ('@yaparate40  #buenmartes nos encantaria escuchar a @pabloalboran #pasosdecero esperando junio! @legalborannormx https://t.co/1xyzhdznyu', 0)]


In [91]:
tweets_negative = tweets_words.map(lambda t: (t[0], 1 if t[1] in negative_words_list else 0))
tweets_negative_count = tweets_negative.reduceByKey(add)
print tweets_negative_count.take(5)

[('hasta nunca #adiosradiomarcaholaverdad', 0), ('rt @fornerinojl: el estado venezolano le asigna a empresas polar las divisas y lorenzo #mendozasabotealapatria .@nicolasmaduro https://t.co...', 1), ('sigue las redes sociales de #micausamimega y preparate porque pronto te sorprenderas https://t.co/1hemf4ujwj', 0), ('rt @fnaraujor: asi recibe cartagena hoy a santos con pancartas y rechiflas #santoscolombianotequiere https://t.co/il9vcb8ffo', 0), ('@yaparate40  #buenmartes nos encantaria escuchar a @pabloalboran #pasosdecero esperando junio! @legalborannormx https://t.co/1xyzhdznyu', 0)]


In [None]:
# Union

In [79]:
tweets_hashtag = tweets_es_hashtags.map(lambda t: (unicodedata.normalize('NFKD',t["text"]).encode('ascii','ignore').lower(), \
                map(lambda h: unicodedata.normalize('NFKD',h["text"]).encode('ascii','ignore').lower(), t["entities"]["hashtags"])))\
                .flatMapValues(lambda x: x)
print tweets_hashtag.take(2)

[('@disneyspain @tinistoessel libera logooo \n#tini', 'tini'), ('rt @amctv_es: el grupo esta separado... conseguiran escapar? llegaran a baja? el proximo lunes nuevo episodio! #feartwd\nhttps://t.co/vqw...', 'feartwd')]
