# Spark LDA Implementacion

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, col, expr, when, concat, lit, isnan
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import nltk
import pandas as pd
import numpy as np
import re
import codecs
from nltk.corpus import stopwords
 
stop_words_nltk = list(set(stopwords.words('english')))

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1560741669686_0005,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark

VBox()

<pyspark.sql.session.SparkSession object at 0x7ff23d81cad0>

## Leer Informacion desde S3
Leemos un news.csv desde s3 para cargarlo en un df de spark

In [3]:
df=spark.read.csv("s3a://finaltext/news.csv", inferSchema=True, header=True)

VBox()

In [4]:
df.printSchema()

VBox()

root
 |-- id: string (nullable = true)
 |-- id_news: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publication: string (nullable = true)
 |-- author: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- url: string (nullable = true)
 |-- content: string (nullable = true)

## Limpiamos y concatenamos la informacion content + author + title

In [5]:
df= df.fillna({'content': ''})
df= df.fillna({'author': ''})
df= df.fillna({'title': ''})

VBox()

In [6]:
df = df.select(concat(col("title"), lit(" "), col("content"), lit(" "), col("author")).alias("news"))

VBox()

In [7]:
df.show(15)

VBox()

+--------------------+
|                news|
+--------------------+
|House Republicans...|
|Rift Between Offi...|
|Tyrus Wong, ‘Bamb...|
|Among Deaths in 2...|
|Kim Jong-un Says ...|
|Sick With a Cold,...|
|Taiwan’s Presiden...|
|After ‘The Bigges...|
|First, a Mixtape....|
|Calling on Angels...|
|Weak Federal Powe...|
|Can Carbon Captur...|
|Mar-a-Lago, the F...|
|How to form healt...|
|Turning Your Vaca...|
+--------------------+
only showing top 15 rows

In [8]:
df = df.withColumn("uid", monotonically_increasing_id())     # Create Unique ID

VBox()

In [9]:
df.show(20)

VBox()

+--------------------+---+
|                news|uid|
+--------------------+---+
|House Republicans...|  0|
|Rift Between Offi...|  1|
|Tyrus Wong, ‘Bamb...|  2|
|Among Deaths in 2...|  3|
|Kim Jong-un Says ...|  4|
|Sick With a Cold,...|  5|
|Taiwan’s Presiden...|  6|
|After ‘The Bigges...|  7|
|First, a Mixtape....|  8|
|Calling on Angels...|  9|
|Weak Federal Powe...| 10|
|Can Carbon Captur...| 11|
|Mar-a-Lago, the F...| 12|
|How to form healt...| 13|
|Turning Your Vaca...| 14|
|As Second Avenue ...| 15|
|Dylann Roof Himse...| 16|
|Modi’s Cash Ban B...| 17|
|Suicide Bombing i...| 18|
|Fecal Pollution T...| 19|
+--------------------+---+
only showing top 20 rows

## Limpiamos las noticias

In [10]:
def cleanup_text(record):
    content = record[0]
    words = content.split()
    
    # Custom List of Stopwords - Add your own here
    stopwords_custom = ['']
    stopwords = stop_words_nltk + stopwords_custom
    stopwords = [word.lower() for word in stopwords]    
    
    text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words]                                       # Remove special characters
    text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords]     # Remove stopwords and words under X length
    return text_out

VBox()

In [11]:
udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = df.withColumn("words", udf_cleantext(struct([df[x] for x in df.columns])))

VBox()

In [12]:
clean_text.show(2)

VBox()

+--------------------+---+--------------------+
|                news|uid|               words|
+--------------------+---+--------------------+
|House Republicans...|  0|[house, republica...|
|Rift Between Offi...|  1|[rift, officers, ...|
+--------------------+---+--------------------+
only showing top 2 rows

In [13]:
clean_text.cache()

VBox()

DataFrame[news: string, uid: bigint, words: array<string>]

In [14]:
clean_text.is_cached

VBox()

True

## Construimos el Bag of Words usando CountVectorizer y IDF de mllib de pyspark

In [15]:
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

VBox()

In [16]:
rescaledData.show(10)

VBox()

+--------------------+---+--------------------+--------------------+--------------------+
|                news|uid|               words|         rawFeatures|            features|
+--------------------+---+--------------------+--------------------+--------------------+
|House Republicans...|  0|[house, republica...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|
|Rift Between Offi...|  1|[rift, officers, ...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|
|Tyrus Wong, ‘Bamb...|  2|[tyrus, wong, bam...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|
|Among Deaths in 2...|  3|[among, deaths, 2...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|
|Kim Jong-un Says ...|  4|[kim, jongun, say...|(1000,[0,1,3,5,7,...|(1000,[0,1,3,5,7,...|
|Sick With a Cold,...|  5|[sick, cold, quee...|(1000,[0,5,7,9,12...|(1000,[0,5,7,9,12...|
|Taiwan’s Presiden...|  6|[taiwans, preside...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|
|After ‘The Bigges...|  7|[biggest, loser, ...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|
|First, a 

In [17]:
rescaledData.cache()

VBox()

DataFrame[news: string, uid: bigint, words: array<string>, rawFeatures: vector, features: vector]

## Corremos el modleo para seleccion de topicos Latent Dirichlet Allocation LDA de pyspark

In [18]:
lda = LDA(k=10, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)

ldatopics = ldamodel.describeTopics()

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)

VBox()

+-----+----------------------------------------------------------------------------------+
|topic|topic_desc                                                                        |
+-----+----------------------------------------------------------------------------------+
|0    |[said, police, trump, china, percent, would, people, north, new, states]          |
|1    |[says, said, like, people, one, trump, new, would, time, years]                   |
|2    |[north, korea, trump, said, nuclear, would, people, president, china, one]        |
|3    |[trump, said, people, tax, says, would, like, health, one, women]                 |
|4    |[trump, said, would, president, people, state, clinton, one, new, states]         |
|5    |[said, apple, trump, people, would, new, one, says, like, percent]                |
|6    |[trump, rubio, cruz, said, says, people, clinton, would, like, one]               |
|7    |[clinton, trump, sanders, comey, said, campaign, hillary, president, percent, fbi]|

In [19]:
ldaResults = ldamodel.transform(rescaledData)
ldaResults = ldaResults.select('uid','news','words','rawFeatures','features','topicDistribution')

VBox()

In [20]:
ldaResults.show(20)

VBox()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+
|uid|                news|               words|         rawFeatures|            features|   topicDistribution|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+
|  0|House Republicans...|[house, republica...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|[0.07550939826576...|
|  1|Rift Between Offi...|[rift, officers, ...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|[0.25086122717048...|
|  2|Tyrus Wong, ‘Bamb...|[tyrus, wong, bam...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|[0.07064762513056...|
|  3|Among Deaths in 2...|[among, deaths, 2...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|[0.03046392352706...|
|  4|Kim Jong-un Says ...|[kim, jongun, say...|(1000,[0,1,3,5,7,...|(1000,[0,1,3,5,7,...|[0.03956896048387...|
|  5|Sick With a Cold,...|[sick, cold, quee...|(1000,[0,5,7,9,12...|(1000,[0,5,7,9,12...|[0.09964954492012...|
|

In [21]:
def select_topic_udf(topicDistribution):
    dom = topicDistribution[0]
    index_dom = 0
    for index in range(len(topicDistribution)):
        if (topicDistribution[index]>dom):
            dom=topicDistribution[index]
            index_dom=index
    
    return index_dom

udf_seltop = udf(select_topic_udf , IntegerType())
ldaResults = ldaResults.withColumn("topic_prin", udf_seltop(ldaResults.topicDistribution))


ldaResults.show(10)

VBox()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|uid|                news|               words|         rawFeatures|            features|   topicDistribution|topic_prin|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  0|House Republicans...|[house, republica...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|[0.07550939826576...|         3|
|  1|Rift Between Offi...|[rift, officers, ...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|[0.25086122717048...|         1|
|  2|Tyrus Wong, ‘Bamb...|[tyrus, wong, bam...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|[0.07064762513056...|         1|
|  3|Among Deaths in 2...|[among, deaths, 2...|(1000,[0,2,3,4,5,...|(1000,[0,2,3,4,5,...|[0.03046392352706...|         1|
|  4|Kim Jong-un Says ...|[kim, jongun, say...|(1000,[0,1,3,5,7,...|(1000,[0,1,3,5,7,...|[0.03956896048387...|         2|
|  5|Sick With a Cold,..

In [None]:
df_pd = ldaResults.toPandas()