# LDA TOPIC MODELING

This notebook applies LDA modeling to a dataset of news headlines:

https://www.kaggle.com/therohk/million-headlines

run at Dataproc with pySpark:

https://cloud.google.com/dataproc/

and following this post:

https://medium.com/@connectwithghosh/topic-modelling-with-latent-dirichlet-allocation-lda-in-pyspark-2cb3ebd5678e

and Apache Spark Documentation:

- http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.clustering.LDA

- https://spark.apache.org/docs/2.1.0/ml-clustering.html#latent-dirichlet-allocation-lda



In [2]:
import pandas as pd
import pyspark
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.mllib.linalg import Vector, Vectors
#from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.ml.clustering import LDA, LDAModel
import nltk
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import size
from pyspark.sql.functions import udf, col
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import udf, struct
import pyspark.sql.types as T 
import string
import nltk
from pyspark.sql import Row

In [3]:
sc

In [4]:
data = spark.read.csv("gs://ei-db/abcnews-date-text.csv",header=True)

In [5]:
data.count()

1103665

In [6]:
data.show()

+------------+--------------------+
|publish_date|       headline_text|
+------------+--------------------+
|    20030219|aba decides again...|
|    20030219|act fire witnesse...|
|    20030219|a g calls for inf...|
|    20030219|air nz staff in a...|
|    20030219|air nz strike to ...|
|    20030219|ambitious olsson ...|
|    20030219|antic delighted w...|
|    20030219|aussie qualifier ...|
|    20030219|aust addresses un...|
|    20030219|australia is lock...|
|    20030219|australia to cont...|
|    20030219|barca take record...|
|    20030219|bathhouse plans m...|
|    20030219|big hopes for lau...|
|    20030219|big plan to boost...|
|    20030219|blizzard buries u...|
|    20030219|brigadier dismiss...|
|    20030219|british combat tr...|
|    20030219|bryant leads lake...|
|    20030219|bushfire victims ...|
+------------+--------------------+
only showing top 20 rows



__Transforme pagTitle column to RDD to work with__

In [7]:
texts=data.rdd.map(lambda x: x['headline_text'])

In [8]:
texts.take(5)

['aba decides against community broadcasting licence',
 'act fire witnesses must be aware of defamation',
 'a g calls for infrastructure protection summit',
 'air nz staff in aust strike for pay rise',
 'air nz strike to affect australian travellers']

__Stopwords list, imported from NLTK library__

In [9]:
nltk.download('stopwords')#must be downloaded to run
stopwords = stopwords.words("english")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [10]:
stopwords[:10]

['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're"]

__Tokenizing__

Filtering the words removing extra spaces between words, split each review into list of words, change them into lowercase, check if they’re alpha numeric, remove any words or typos which are less than three letters, remove any stopwords, and finally add an index to the elements

In [11]:
tokens = texts \
    .map( lambda document: document.strip().lower()) \
    .map( lambda document: re.split(" ", document)) \
    .map( lambda word: [x for x in word if x.isalpha()]) \
    .map( lambda word: [x for x in word if len(x) > 3] ) \
    .map( lambda word: [x for x in word if x not in stopwords]) \
    .zipWithIndex()

__Creating dataframe__

In [12]:
df_txts = sqlContext.createDataFrame(tokens, ["list_of_words",'index'])

In [13]:
df_txts.show()

+--------------------+-----+
|       list_of_words|index|
+--------------------+-----+
|[decides, communi...|    0|
|[fire, witnesses,...|    1|
|[calls, infrastru...|    2|
|[staff, aust, str...|    3|
|[strike, affect, ...|    4|
|[ambitious, olsso...|    5|
|[antic, delighted...|    6|
|[aussie, qualifie...|    7|
|[aust, addresses,...|    8|
|[australia, locke...|    9|
|[australia, contr...|   10|
|[barca, take, rec...|   11|
|[bathhouse, plans...|   12|
|[hopes, launcesto...|   13|
|[plan, boost, par...|   14|
|[blizzard, buries...|   15|
|[brigadier, dismi...|   16|
|[british, combat,...|   17|
|[bryant, leads, l...|   18|
|[bushfire, victim...|   19|
+--------------------+-----+
only showing top 20 rows



__TF-IDF Matrix__

Transforming the rdd into a DataFrame which has two columns — one has index and the other the list of words. CountVectorizer takes this data and returns a sparse matrix of term frequencies attached to the original Dataframe. Same thing goes for the IDF

In [14]:
# TF
#VocabSize of 20000 words and words with frequencies above 10
cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features",vocabSize=20000, minDF=10.0)
cvmodel = cv.fit(df_txts)

result_cv = cvmodel.transform(df_txts)

In [15]:
# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv) 

In [16]:
result_tfidf.show()

+--------------------+-----+--------------------+--------------------+
|       list_of_words|index|        raw_features|            features|
+--------------------+-----+--------------------+--------------------+
|[decides, communi...|    0|(20000,[122,1122,...|(20000,[122,1122,...|
|[fire, witnesses,...|    1|(20000,[6,585,112...|(20000,[6,585,112...|
|[calls, infrastru...|    2|(20000,[21,617,10...|(20000,[21,617,10...|
|[staff, aust, str...|    3|(20000,[75,187,21...|(20000,[75,187,21...|
|[strike, affect, ...|    4|(20000,[14,187,16...|(20000,[14,187,16...|
|[ambitious, olsso...|    5|(20000,[40,1675,1...|(20000,[40,1675,1...|
|[antic, delighted...|    6|(20000,[99,1911,2...|(20000,[99,1911,2...|
|[aussie, qualifie...|    7|(20000,[166,209,4...|(20000,[166,209,4...|
|[aust, addresses,...|    8|(20000,[4,78,151,...|(20000,[4,78,151,...|
|[australia, locke...|    9|(20000,[7,3054,48...|(20000,[7,3054,48...|
|[australia, contr...|   10|(20000,[7,78,451,...|(20000,[7,78,451,...|
|[barc

__Dataframe to model__

In [17]:
df_model=result_tfidf.select('index','list_of_words','features')

In [18]:
df_model.show()

+-----+--------------------+--------------------+
|index|       list_of_words|            features|
+-----+--------------------+--------------------+
|    0|[decides, communi...|(20000,[122,1122,...|
|    1|[fire, witnesses,...|(20000,[6,585,112...|
|    2|[calls, infrastru...|(20000,[21,617,10...|
|    3|[staff, aust, str...|(20000,[75,187,21...|
|    4|[strike, affect, ...|(20000,[14,187,16...|
|    5|[ambitious, olsso...|(20000,[40,1675,1...|
|    6|[antic, delighted...|(20000,[99,1911,2...|
|    7|[aussie, qualifie...|(20000,[166,209,4...|
|    8|[aust, addresses,...|(20000,[4,78,151,...|
|    9|[australia, locke...|(20000,[7,3054,48...|
|   10|[australia, contr...|(20000,[7,78,451,...|
|   11|[barca, take, rec...|(20000,[94,99,921...|
|   12|[bathhouse, plans...|(20000,[66,149,23...|
|   13|[hopes, launcesto...|(20000,[214,1560,...|
|   14|[plan, boost, par...|(20000,[8,9,41,23...|
|   15|[blizzard, buries...|(20000,[529,897,2...|
|   16|[brigadier, dismi...|(20000,[368,373,1...|


__Defining the model__

Here it is defined number of topics (k) and max iterations (maxIter)

In [19]:
num_topics=40
max_iterations=50
lda_model = LDA(k=num_topics, maxIter=max_iterations)

__Running the model__

Takes a quite long time, depending on the workers and so on

In [20]:
model=lda_model.fit(df_model)

__It can be checked its structure using some commands__

In [21]:
model.isDistributed()

False

In [22]:
model.vocabSize()

20000

__Model description__

In [23]:
model.describeTopics(5).show()

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[90, 137, 119, 15...|[0.02632232113767...|
|    1|[102, 6, 166, 278...|[0.02838041859878...|
|    2|[120, 179, 191, 2...|[0.02522239735892...|
|    3|[22, 62, 44, 75, ...|[0.04394980978980...|
|    4|[57, 63, 176, 221...|[0.03466358342920...|
|    5|[10, 38, 150, 82,...|[0.03785823626888...|
|    6|[76, 114, 188, 20...|[0.03493538183775...|
|    7|[4, 16, 86, 68, 164]|[0.05356878726660...|
|    8|[13, 17, 46, 98, ...|[0.04449739467222...|
|    9| [29, 32, 23, 0, 89]|[0.03625447607374...|
|   10|[31, 118, 172, 18...|[0.04255495142358...|
|   11|[27, 74, 131, 142...|[0.04392949045335...|
|   12|[35, 87, 161, 232...|[0.03886320030379...|
|   13|[56, 165, 189, 24...|[0.03765557595745...|
|   14|[55, 24, 296, 362...|[0.04443694874912...|
|   15|[149, 363, 426, 3...|[0.02509242763643...|
|   16|[40, 110, 30, 376...|[0.04050425088939...|


In [24]:
model.describeTopics().first()

Row(topic=0, termIndices=[90, 137, 119, 155, 170, 168, 187, 197, 202, 134], termWeights=[0.026322321137677046, 0.02236724290301282, 0.021793873579204585, 0.021321143954683378, 0.019888977336663942, 0.01952432602468721, 0.019457027836684716, 0.018889900458627013, 0.01825923098918701, 0.01764186447082078])

In [25]:
model.topicsMatrix()

DenseMatrix(20000, 40, [2151.9187, 4402.0983, 5805.5954, 246.8059, 1635.5771, 2.133, 874.6418, 478.9826, ..., 0.258, 0.2558, 0.2896, 86.7744, 0.2288, 0.2195, 0.2577, 0.3562], 0)

__Getting the dataframe with index and topics an weights__

In [26]:
transformed = model.transform(df_model)

In [27]:
transformed.show()

+-----+--------------------+--------------------+--------------------+
|index|       list_of_words|            features|   topicDistribution|
+-----+--------------------+--------------------+--------------------+
|    0|[decides, communi...|(20000,[122,1122,...|[7.87046718762269...|
|    1|[fire, witnesses,...|(20000,[6,585,112...|[7.15877969844703...|
|    2|[calls, infrastru...|(20000,[21,617,10...|[0.51560511134553...|
|    3|[staff, aust, str...|(20000,[75,187,21...|[0.25712201845998...|
|    4|[strike, affect, ...|(20000,[14,187,16...|[0.22051273417340...|
|    5|[ambitious, olsso...|(20000,[40,1675,1...|[8.35689422867455...|
|    6|[antic, delighted...|(20000,[99,1911,2...|[8.14157283724233...|
|    7|[aussie, qualifie...|(20000,[166,209,4...|[4.62855558780362...|
|    8|[aust, addresses,...|(20000,[4,78,151,...|[8.48765740543258...|
|    9|[australia, locke...|(20000,[7,3054,48...|[0.00113862618909...|
|   10|[australia, contr...|(20000,[7,78,451,...|[9.60202707624237...|
|   11

In [28]:
transformed.first()

Row(index=0, list_of_words=['decides', 'community', 'broadcasting', 'licence'], features=SparseVector(20000, {122: 5.495, 1122: 7.064, 5537: 8.9797, 10076: 9.9439}), topicDistribution=DenseVector([0.0008, 0.0008, 0.0008, 0.2188, 0.2803, 0.0008, 0.0008, 0.1652, 0.308, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0007, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008, 0.0007, 0.0008, 0.0008, 0.0008, 0.0008, 0.0008]))

__Mergin dataframes to get the topics and weights form each headline__

In [29]:
#Adding index to data dataframe
headlines=texts.zipWithIndex()

In [30]:
#Creating dataframe
df_headlines = sqlContext.createDataFrame(headlines, ["headlines",'index'])

In [31]:
df_headlines.show()

+--------------------+-----+
|           headlines|index|
+--------------------+-----+
|aba decides again...|    0|
|act fire witnesse...|    1|
|a g calls for inf...|    2|
|air nz staff in a...|    3|
|air nz strike to ...|    4|
|ambitious olsson ...|    5|
|antic delighted w...|    6|
|aussie qualifier ...|    7|
|aust addresses un...|    8|
|australia is lock...|    9|
|australia to cont...|   10|
|barca take record...|   11|
|bathhouse plans m...|   12|
|big hopes for lau...|   13|
|big plan to boost...|   14|
|blizzard buries u...|   15|
|brigadier dismiss...|   16|
|british combat tr...|   17|
|bryant leads lake...|   18|
|bushfire victims ...|   19|
+--------------------+-----+
only showing top 20 rows



In [32]:
merged=df_headlines.join(transformed, on=['index'],how='left')

In [33]:
merged.show()

+-----+--------------------+--------------------+--------------------+--------------------+
|index|           headlines|       list_of_words|            features|   topicDistribution|
+-----+--------------------+--------------------+--------------------+--------------------+
|   26|commonwealth bank...|[commonwealth, ba...|(20000,[30,142,22...|[5.48684415763207...|
|   29|councillor to con...|[councillor, cont...|(20000,[848,1214,...|[8.54094794320740...|
|  474|bracks backs ruli...|[bracks, backs, r...|(20000,[65,107,10...|[5.91045684361068...|
|  964|all ords jumps 33...|[ords, jumps, poi...|(20000,[622,1358,...|[8.08410458255682...|
| 1677|tour of valencia ...|[tour, valencia, ...|(20000,[33,177,62...|[7.39797624391325...|
| 1697|victorian silk fa...|[victorian, silk,...|(20000,[3,25,138,...|[6.30737653783935...|
| 1806|jail for firefigh...|[jail, firefighte...|(20000,[128,2952,...|[8.11545131403488...|
| 1950|brogden would eas...|[brogden, would, ...|(20000,[281,298,8...|[0.1621309

__Saving the model__

In [None]:
model.save('gs://bucket_name/LDAModel')