In [1]:
import findspark
findspark.init("/usr/hdp/2.6.5.0-292/spark2")
import pyspark
sc = pyspark.SparkContext()

In [2]:
import pandas as pd
import pyspark
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
from nltk.corpus import stopwords
import re as re

In [4]:
from pyspark.ml.feature import CountVectorizer , IDF

In [5]:
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel

In [6]:
sqlContext.read.csv('/user/maria_dev/data/newsarticles_article.csv', sep=',', escape='"', header=True, 
               inferSchema=True, multiLine=True).count()

465160

In [7]:
data = sqlContext.read.csv('/user/maria_dev/data/newsarticles_article.csv', sep=',', escape='"', header=False, 
               inferSchema=True, multiLine=True)

In [8]:
data.head()

Row(_c0=122140, _c1='A', _c2='http://abclocal.go.com/wls/story?section=news/local&id=9240864&rss=rss-wls-article-9240864', _c3=None, _c4='Ventra cards available for all users Monday | abc7chicago.com', _c5="![][1]\n\nSeptember 8, 2013 (CHICAGO) (WLS) --  Monday is a big day for the Chicago\nTransit Authority and Pace as they roll out the Ventra Cards for all riders.\n\nVentra was rolled out to U-Pass Chicago card holders and Chicago Public School\nstudents first.\n\nNow, all CTA users will be able to use the new system.\n\n####  Related Content\n\n[ Story:  CTA's Ventra cards in service Monday ][2]\n\n[ Story:  Ventra cards for sale online, by phone, at stores ][3]\n\n[ Story:  CTA waives Ventra card's $5 one-time fee ][4]\n\n[ Story:  CTA says it has fixed Ventra fare card glitch ][5]\n\n[ Story:  Ventra card use starting on CTA, Pace ][6]\n\n[ Story:  CTA, Pace to launch Ventra payment system ][7]\n\n[ Story:  August launch for CTA, Pace Ventra system ][8]\n\n[ Story:  CTA defends Ve

In [9]:
data2 = data.sample(False, 0.05, seed=0)
data2.count()

23331

In [10]:
data= None

In [11]:
reviews = data2.rdd.map(lambda x : x['_c5']).filter(lambda x: x is not None)

In [12]:
reviews.top(1)

['• ** Felicia the ferret ** helped clean the lab’s long, narrow pipes during\nthe ’70s. She would scurry through a pipe with a string tied to her harness.\nWhen she came out the other end, staff attached a giant cotton swab to the\nstring and pulled it back through.\n\n• In 1992, Fermilab launched the ** third website ever ** . The World Wide Web\nwas created, after all, to exchange particle physics data.\n\n• A ** 50-foot-wide electromagnet ** used for collecting data on subatomic\nparticles made a 35-day journey from Long Island by barge and flatbed truck in\n2013. It barely cleared the sides of the toll arches along Interstate 355 when\nit got to Illinois.\n\n• Contrary to rumors, the ** bison ** at the lab weren’t introduced to\nforewarn of dangerous levels of radioactivity but to serve as a metaphor for\nthe frontier of high-energy physics.\n\n_ Find out more about Fermilab’s\xa0Family Day on February 12 [ here ][1] . _\n\nThis article appears in the [ February\xa02017 ][2] issue

In [13]:
import nltk
nltk.download('stopwords')
StopWords = stopwords.words("english")

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/maria_dev/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [14]:
tokens = reviews.map( lambda document: document.replace('\r', '').replace('\n', ' ').replace('\xa0', ' ').strip().lower()).map( lambda document: re.split(" ", document)).map( lambda word: [x for x in word if x.isalpha()]).map( lambda word: [x for x in word if len(x) > 3] ).map( lambda word: [x for x in word if x not in StopWords]).zipWithIndex()

In [15]:
df_txts = sqlContext.createDataFrame(tokens, ["list_of_words",'index'])

In [16]:
cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(df_txts)
result_cv = cvmodel.transform(df_txts)

In [17]:
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv) 

In [18]:
result_tfidf.take(10)

[Row(list_of_words=['city', 'ordinance', 'brings', 'changes', 'public', 'transit', 'monday', 'september', 'politics', 'story', 'reporter', 'bill', 'cameron', 'city', 'aldermen', 'reluctantly', 'gutted', 'former', 'mayor', 'richard', 'ordinance', 'order', 'bring', 'compliance', 'concealed', 'carry', 'ordinance', 'still', 'remaining', 'requirement', 'owners', 'home', 'keep', 'guns', 'secure', 'either', 'locked', 'trigger', 'debate', 'alderman', 'burke', 'maneuvered', 'lobbyist', 'todd', 'vandermyde', 'opposing', 'blanket', 'rule', 'generally', 'burke', 'oppose', 'ordinance', 'tries', 'increase', 'safety', 'homes', 'oppose', 'safety', 'measures', 'impediment', 'lawful', 'wrinkle', 'ordinance', 'requires', 'movie', 'makers', 'theaters', 'using', 'guns', 'armourer', 'million', 'dollars', 'liability', 'aldermen', 'enhanced', 'penalties', 'violations', 'around', 'buses', 'around', 'stops', 'even', 'vandermyde', 'objects', 'prohibitions', 'guns', 'vandermyde', 'increased', 'penalties', 'illega

In [19]:
num_topics = 15
max_iterations = 10
lda_model = LDA.train(result_tfidf.select('index','features').rdd.mapValues(Vectors.fromML).map(list), k=num_topics, maxIterations=max_iterations)

In [23]:
wordNumbers = 10  
topicIndices = lda_model.describeTopics(maxTermsPerTopic = wordNumbers)
for x, topic in enumerate(topicIndices):
    print( 'topic nr' + str(x))
    words = topic[0]
    weights = topic[1]
    for n in range(len(words)):
        print( cvmodel.vocabulary[words[n]] + ' ' + str(weights[n]))

topic nr0
police 0.003026836559036033
said 0.0023938428473666944
chicago 0.0020294849055788183
would 0.0017995959594099917
people 0.0016666415414770958
trump 0.0015995023813387603
state 0.0015743508399034198
like 0.0015265946731479112
cubs 0.0014500453994729778
illinois 0.0014153173379954965
topic nr1
police 0.002768799482063461
said 0.002299502247016273
chicago 0.0020386231761737586
would 0.001847320725314155
people 0.0017589152424858955
cubs 0.001721257396690751
like 0.001655846293608911
shot 0.0015522402465540034
school 0.00153653243389785
first 0.0014661690712044026
topic nr2
police 0.0025155612302565594
said 0.0022951863645676697
chicago 0.001978295863316992
would 0.0019377921653733615
bears 0.001716779536532738
people 0.0016631916703633315
state 0.0016534585429663416
like 0.0016200811698885266
school 0.0015704608259383356
trump 0.001547619164500908
topic nr3
dogs 0.0032956169204022127
police 0.002768722719016405
said 0.0024206995328072663
chicago 0.0020151190172433566
would 0.001

topicIndices = sc.parallelize(lda_model.describeTopics(maxTermsPerTopic = wordNumbers))
def topic_render(topic):
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result
topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()
for topic in range(len(topics_final)):
    print ("Topic" + str(topic) + ":")
    for term in topics_final[topic]:
        print (term)
    print ('\n')