In [56]:
import numpy as np
import json
import re
from pyspark.sql import *
from pyspark import SparkContext, SQLContext
from pyspark.ml.feature import *
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.sql import functions as F
import pickle

#### Read data

> Description
This dataset contains roughly 20B tweets, from June 2013 to Jan 2016. The tweets are not filtered, and could be roughly 10% of tweets during that time. The format is unicode text with one line per tweet, including language, date and user. Only language detected by twitter as de,en,es,fr,it,nz are included.

In [40]:
sqlContext = SQLContext(sc)

data = sc.textFile("/datasets/tweets-leon")

# Exploratory data analysis

> 1 Handle the data in its size

In [41]:
frist_tweet = data.first()
frist_tweet

u'en\t345963923251539968\tSat Jun 15 18:00:01 +0000 2013\tLetataleta\tRT @silsilfani: the world is not a wish-granting machine. dont be surprised when everything always end up disappointing.'

In [42]:
"""Chose tweets that have exactly 5 components like normal 
         (language, id, date, username, content)"""

def selection_tweet(tweet):
    return len(tweet.split("\t")) == 5

In [43]:
"encode tweet"

def encode_tweet(tweet):
        return [t.encode("utf8") for t in tweet.split("\t")]

In [44]:
encode_tweet(frist_tweet)

['en',
 '345963923251539968',
 'Sat Jun 15 18:00:01 +0000 2013',
 'Letataleta',
 'RT @silsilfani: the world is not a wish-granting machine. dont be surprised when everything always end up disappointing.']

We can chose a sub dataset for working on

In [45]:
data = data.filter(selection_tweet)

en_data = data.filter(lambda x : x[:2]=='en')
de_data = data.filter(lambda x : x[:2]=='de')
es_data = data.filter(lambda x : x[:2]=='es')
fr_data = data.filter(lambda x : x[:2]=='fr')
it_data = data.filter(lambda x : x[:2]=='it')
nz_data = data.filter(lambda x : x[:2]=='nz')

data_2013 = data.filter(lambda tweet : 
                        encode_tweet(tweet)[2][-4:] == '2013')
data_2014 = data.filter(lambda tweet : 
                        encode_tweet(tweet)[2][-4:] == '2014')
data_2015 = data.filter(lambda tweet : 
                        encode_tweet(tweet)[2][-4:] == '2015')
data_2016 = data.filter(lambda tweet : 
                        encode_tweet(tweet)[2][-4:] == '2016')

In [46]:
some_fr_tweets = fr_data.take(5)
some_fr_tweets

[u'fr\t345963923255730176\tSat Jun 15 18:00:01 +0000 2013\t_irem61_\tRT @DHC_Music: Terrorism ... #FreePalestina http://t.co/OLWnVlW682',
 u'fr\t345963923335413761\tSat Jun 15 18:00:01 +0000 2013\tHairCutGroup\tHair by Unihair-boutique\\nhttp://t.co/fRGYMrRcQe http://t.co/7sXXzn1x34',
 u'fr\t345963923465441280\tSat Jun 15 18:00:01 +0000 2013\tChajopico\tRT @KimberleyNoe: " J\'ai rencontr\xe9 un autre. On a pass\xe9 des moments formidables. C\'\xe9tait bien mais ce n\'\xe9tait pas toi. "',
 u'fr\t345963923360604161\tSat Jun 15 18:00:01 +0000 2013\tyousinceforever\tDu coup sa ma blaser de tourner en rond la!',
 u"fr\t345963923398328320\tSat Jun 15 18:00:01 +0000 2013\tGraciaDiamond_\tLa meilleure de #SS7 c'est Ana\xefs sans aucun doute!!! ;-)"]

> 2 Understand what’s into the data (formats, distributions, missing values, correlations, etc.).

In [47]:
some_fr_tweets = [encode_tweet(tweet) for tweet in some_fr_tweets]

In [48]:
print 'Some french tweets:'
for ind, t in enumerate(some_fr_tweets):
    print ind + 1,')User name:',t[3]
    print '         Tweets:', t[4]
    print '         at:', t[2]
    print 

Some french tweets:
1 )User name: _irem61_
         Tweets: RT @DHC_Music: Terrorism ... #FreePalestina http://t.co/OLWnVlW682
         at: Sat Jun 15 18:00:01 +0000 2013

2 )User name: HairCutGroup
         Tweets: Hair by Unihair-boutique\nhttp://t.co/fRGYMrRcQe http://t.co/7sXXzn1x34
         at: Sat Jun 15 18:00:01 +0000 2013

3 )User name: Chajopico
         Tweets: RT @KimberleyNoe: " J'ai rencontré un autre. On a passé des moments formidables. C'était bien mais ce n'était pas toi. "
         at: Sat Jun 15 18:00:01 +0000 2013

4 )User name: yousinceforever
         Tweets: Du coup sa ma blaser de tourner en rond la!
         at: Sat Jun 15 18:00:01 +0000 2013

5 )User name: GraciaDiamond_
         Tweets: La meilleure de #SS7 c'est Anaïs sans aucun doute!!! ;-)
         at: Sat Jun 15 18:00:01 +0000 2013



> 3 Considered ways to enrich, filter, transform the data according to your needs.

# A very simple Topic Modeling using LDA

In [28]:
"""Take only the data in english"""
en_data = data.filter(lambda x : x[:2]=='en')

"""Encode UTF-8"""
en_data = en_data.map(encode_tweet)

"""Take only ID and CONTENT of a tweet"""
tweets = en_data.map(lambda tweet : Row(id=tweet[1], sentence=tweet[4]))

"""Create DF"""
df_tweets = sqlContext.createDataFrame(tweets)

df_tweets.show(3)

+------------------+--------------------+
|                id|            sentence|
+------------------+--------------------+
|345963923251539968|RT @silsilfani: t...|
|345963923297673217|RT @WhosThisHoe: ...|
|345963923259924480|Can't stand peopl...|
+------------------+--------------------+
only showing top 3 rows



In [30]:
"""Tokenization"""
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="raw", pattern="\\W")
regexTokenized = regexTokenizer.transform(df_tweets)

regexTokenized.show(3)

+------------------+--------------------+--------------------+
|                id|            sentence|                 raw|
+------------------+--------------------+--------------------+
|345963923251539968|RT @silsilfani: t...|[rt, silsilfani, ...|
|345963923297673217|RT @WhosThisHoe: ...|[rt, whosthishoe,...|
|345963923259924480|Can't stand peopl...|[can, t, stand, p...|
+------------------+--------------------+--------------------+
only showing top 3 rows



In [31]:
"""Remove Stop-words"""
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
removed_stopwords = remover.transform(regexTokenized)

removed_stopwords.show(3)

+------------------+--------------------+--------------------+--------------------+
|                id|            sentence|                 raw|            filtered|
+------------------+--------------------+--------------------+--------------------+
|345963923251539968|RT @silsilfani: t...|[rt, silsilfani, ...|[rt, silsilfani, ...|
|345963923297673217|RT @WhosThisHoe: ...|[rt, whosthishoe,...|[rt, whosthishoe,...|
|345963923259924480|Can't stand peopl...|[can, t, stand, p...|[t, stand, people...|
+------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [21]:
"""Lemmatization"""

#TODO

'Lemmatization'

 Warning: Computation of TF-IDF and LDA take a lot of time (1h30 on the cluster)

In [None]:
"""TF-IDF"""

cv = CountVectorizer(inputCol="filtered", outputCol="vectors")
count_vectorizer_model = cv.fit(removed_stopwords)
tf = count_vectorizer_model.transform(removed_stopwords)

idf = IDF(inputCol="vectors", outputCol="tfidf")
idfModel = idf.fit(tf)
tfidf = idfModel.transform(tf)

tfidf.show(3)

In [None]:
"""Topics extraction with LDA"""

nbTopics=100
n_terms=15

corpus = tfidf.select(F.col('id').cast("long"), 'tfidf').rdd.map(lambda x: [x[0], x[1]])
ldaModel = LDA.train(corpus, k=nbTopics)


topics = ldaModel.describeTopics(maxTermsPerTopic=n_terms)
vocabulary = count_vectorizer_model.vocabulary

"""Store result"""
with open("topics.pickle", "wb") as f:
    pickle.dump(topics, f)
with open("vocabulary.pickle", "wb") as f:
    pickle.dump(vocabulary, f)   

In [None]:
"""Load result computed from cluster"""

with open("topics.pickle", "rb") as f:
    topics = pickle.load(f)
    
with open("vocabulary.pickle", "rb") as f:
    vocabulary = pickle.load(f)

In [None]:
for topic in range(len(topics)):
    print("topic {} : ".format(topic))
    words = topics[topic][0]
    scores = topics[topic][1]
    for word in range(len(words)):
        print(vocabulary[words[word]], "-", scores[word])

> 4 Updated your plan in a reasonable way, reflecting your improved knowledge after data acquaintance. In particular, discuss how your data suits your project needs and discuss the methods you’re going to use, giving their essential mathematical details in the notebook

> 5 That your plan for analysis and communication is now reasonable and sound, potentially discussing alternatives to your choices that you considered but dropped.

# How does it work ???

In [57]:
sentenceDataFrame = sqlContext.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat"),
    (3, "I want a coffee before going to bed"),
    (4, "Today is a big day !!!")
], ["id", "sentence"])

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
regexTokenized = regexTokenizer.transform(sentenceDataFrame)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed = remover.transform(regexTokenized)

cv = CountVectorizer(inputCol="filtered", outputCol="vectors")
count_vectorizer_model = cv.fit(removed)
tf = count_vectorizer_model.transform(removed)


idf = IDF(inputCol="vectors", outputCol="tfidf")
idfModel = idf.fit(tf)
tfidf = idfModel.transform(tf)

nbTopics=3
n_terms=3

corpus = tfidf.select(F.col('id').cast("long"), 'tfidf').rdd.map(lambda x: [x[0], x[1]])
ldaModel = LDA.train(corpus, k=nbTopics)


# extracting topics
topics = ldaModel.describeTopics(maxTermsPerTopic=n_terms)
# extraction vocabulary
vocabulary = count_vectorizer_model.vocabulary
file = open("testfile.txt","w")
for topic in range(len(topics)):
    print("topic {} : ".format(topic))
    file.write("topic {} : \n".format(topic)) 
    words = topics[topic][0]
    scores = topics[topic][1]
    for word in range(len(words)):
        file.write("{} - {}\n".format(vocabulary[words[word]], scores[word]))
        print(vocabulary[words[word]], "-", scores[word])
file.close()

topic 0 : 
(u'going', '-', 0.05409342650763188)
(u'bed', '-', 0.05407417503427965)
(u'want', '-', 0.05401486983908868)
topic 1 : 
(u'java', '-', 0.05407759673070426)
(u'classes', '-', 0.05390088196503407)
(u'case', '-', 0.053459237515048524)
topic 2 : 
(u'today', '-', 0.0532383963697219)
(u'day', '-', 0.05306853182395553)
(u'hi', '-', 0.052806852808286335)
