# LDA con pySpark 


In [2]:
import os
os.cpu_count()

4

In [3]:
!echo $(($(getconf _PHYS_PAGES) * $(getconf PAGE_SIZE) / (1024 * 1024)))

7851


## Crear Sesión Colab

In [4]:
# Crear una sesión de spark

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Linux")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [5]:
spark

## Importar bibliotecas 

In [6]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, concat, split, col
from pyspark.ml.feature import RegexTokenizer, NGram, VectorAssembler, CountVectorizer, IDF
from pyspark.ml.clustering import LDA, LocalLDAModel
from pyspark.ml import Pipeline
from nltk.stem import WordNetLemmatizer


import re
from nltk.corpus import stopwords
import nltk
nltk.download('stopwords')
nltk.download('wordnet')  #WordNetLemmatizer


[nltk_data] Downloading package stopwords to
[nltk_data]     /home/anaisabel/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /home/anaisabel/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

# Leer los datos

In [7]:
df = spark.read.csv("datos/abcnews-date-text.csv",header=True)

print('Cantidad de renglones: ', df.count())

Cantidad de renglones:  1226258


In [8]:
df.printSchema() # explorar la estructura el dataframe

root
 |-- publish_date: string (nullable = true)
 |-- headline_text: string (nullable = true)



In [9]:
df.head(3)

[Row(publish_date='20030219', headline_text='aba decides against community broadcasting licence'),
 Row(publish_date='20030219', headline_text='act fire witnesses must be aware of defamation'),
 Row(publish_date='20030219', headline_text='a g calls for infrastructure protection summit')]

In [10]:
df.show(10) # default 20 renglones

+------------+--------------------+
|publish_date|       headline_text|
+------------+--------------------+
|    20030219|aba decides again...|
|    20030219|act fire witnesse...|
|    20030219|a g calls for inf...|
|    20030219|air nz staff in a...|
|    20030219|air nz strike to ...|
|    20030219|ambitious olsson ...|
|    20030219|antic delighted w...|
|    20030219|aussie qualifier ...|
|    20030219|aust addresses un...|
|    20030219|australia is lock...|
+------------+--------------------+
only showing top 10 rows



In [11]:
df.select('headline_text').take(1)

[Row(headline_text='aba decides against community broadcasting licence')]

In [12]:
# raw text of the first entry 
df.select('headline_text').head(1)[0][0]

'aba decides against community broadcasting licence'

In [13]:
type(df)

pyspark.sql.dataframe.DataFrame

Queremos un inice consecutivo, para ello vamos a utilizar solo la columna 'hedline_text' y usando rdd creamos el indice

In [14]:
texts = df.rdd.map(lambda x: x['headline_text'])
headlines=texts.zipWithIndex( )                    

In [15]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
#Creating dataframe
data = sqlContext.createDataFrame(headlines, ["headlines",'index'])

# Preprocesar Texto

## Normalizar y tokenizar

In [16]:
removePunct = udf(lambda s: s.strip().lower(), StringType())

data_norm = data.withColumn("text", removePunct(data.headlines))

In [17]:
# tokenize 
tokenizer = RegexTokenizer(inputCol="text", outputCol="words",
                           gaps=True, pattern=r'\s+', minTokenLength=4)
df_tokens = tokenizer.transform(data_norm)

In [18]:
df_tokens.show()

+--------------------+-----+--------------------+--------------------+
|           headlines|index|                text|               words|
+--------------------+-----+--------------------+--------------------+
|aba decides again...|    0|aba decides again...|[decides, against...|
|act fire witnesse...|    1|act fire witnesse...|[fire, witnesses,...|
|a g calls for inf...|    2|a g calls for inf...|[calls, infrastru...|
|air nz staff in a...|    3|air nz staff in a...|[staff, aust, str...|
|air nz strike to ...|    4|air nz strike to ...|[strike, affect, ...|
|ambitious olsson ...|    5|ambitious olsson ...|[ambitious, olsso...|
|antic delighted w...|    6|antic delighted w...|[antic, delighted...|
|aussie qualifier ...|    7|aussie qualifier ...|[aussie, qualifie...|
|aust addresses un...|    8|aust addresses un...|[aust, addresses,...|
|australia is lock...|    9|australia is lock...|[australia, locke...|
|australia to cont...|   10|australia to cont...|[australia, contr...|
|barca

## Removing stopwords

In [19]:
stopwords = stopwords.words("english")
removeStop=udf(lambda word: [x for x in word if x not in stopwords])
df_tokens=df_tokens.withColumn('noStopWords',removeStop(df_tokens['words']))

In [20]:
df_tokens.show()

+--------------------+-----+--------------------+--------------------+--------------------+
|           headlines|index|                text|               words|         noStopWords|
+--------------------+-----+--------------------+--------------------+--------------------+
|aba decides again...|    0|aba decides again...|[decides, against...|[decides, communi...|
|act fire witnesse...|    1|act fire witnesse...|[fire, witnesses,...|[fire, witnesses,...|
|a g calls for inf...|    2|a g calls for inf...|[calls, infrastru...|[calls, infrastru...|
|air nz staff in a...|    3|air nz staff in a...|[staff, aust, str...|[staff, aust, str...|
|air nz strike to ...|    4|air nz strike to ...|[strike, affect, ...|[strike, affect, ...|
|ambitious olsson ...|    5|ambitious olsson ...|[ambitious, olsso...|[ambitious, olsso...|
|antic delighted w...|    6|antic delighted w...|[antic, delighted...|[antic, delighted...|
|aussie qualifier ...|    7|aussie qualifier ...|[aussie, qualifie...|[aussie, q

## Lematización

In [21]:
lemma= WordNetLemmatizer()
def lematizacion(in_vec):
    out_vec = [lemma.lemmatize(w) for w in in_vec]
    return out_vec

lemma_udf = udf(lambda x:lematizacion(x),ArrayType(StringType()))
df_tokens=df_tokens.withColumn('finalwords',lemma_udf(df_tokens['noStopWords']))


In [22]:
df_tokens.show()

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|           headlines|index|                text|               words|         noStopWords|          finalwords|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|aba decides again...|    0|aba decides again...|[decides, against...|[decides, communi...|[decides, communi...|
|act fire witnesse...|    1|act fire witnesse...|[fire, witnesses,...|[fire, witnesses,...|[fire, witness, m...|
|a g calls for inf...|    2|a g calls for inf...|[calls, infrastru...|[calls, infrastru...|[call, infrastruc...|
|air nz staff in a...|    3|air nz staff in a...|[staff, aust, str...|[staff, aust, str...|[staff, aust, str...|
|air nz strike to ...|    4|air nz strike to ...|[strike, affect, ...|[strike, affect, ...|[strike, affect, ...|
|ambitious olsson ...|    5|ambitious olsson ...|[ambitious, olsso...|[ambitious, olsso...|[ambi

In [23]:
df_tokens.printSchema()

root
 |-- headlines: string (nullable = true)
 |-- index: long (nullable = true)
 |-- text: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- noStopWords: string (nullable = true)
 |-- finalwords: array (nullable = true)
 |    |-- element: string (containsNull = true)



###N-Grams

## Vectorizar con unigramas y bigramas

En pyspark tenemos que primero que crear todos los n-gramas que nos interesen y luego utilizar el modelo CountVectorizer y unirlo todo utilizando VectorAssembler

In [24]:
def build_ngrams(inputCol="finalwords", n=2):

    ngrams = [
        NGram(n=i, inputCol="finalwords", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    vectorizers = [
        CountVectorizer(inputCol="{0}_grams".format(i),
            outputCol="{0}_counts".format(i), minDF=20)
        for i in range(1, n + 1)
    ]

    assembler = [VectorAssembler(
        inputCols=["{0}_counts".format(i) for i in range(1, n + 1)],
        outputCol="features_cv"
    )]

    return Pipeline(stages=ngrams + vectorizers + assembler)

In [60]:
%%time
# TF
pipline_vectorizer_ngrams = build_ngrams().fit(df_tokens)


CPU times: user 48.4 ms, sys: 7.93 ms, total: 56.3 ms
Wall time: 2min 29s


In [61]:
pipline_vectorizer_ngrams.stages

[NGram_4bd4ad453bcd,
 NGram_bcbfc2431b84,
 CountVectorizerModel: uid=CountVectorizer_dcb2c6c639e9, vocabularySize=16599,
 CountVectorizerModel: uid=CountVectorizer_cd8ca00483ce, vocabularySize=19985,
 VectorAssembler_2423b77deccf]

In [62]:
vectorizers = [s for s in pipline_vectorizer_ngrams.stages if isinstance(s, CountVectorizerModel)]
vocabArray = [v.vocabulary for v in vectorizers]
len(vocabArray)

2

In [69]:
# Con cuantas palabras y bigramas nos quedamos
palabras=vocabArray[0]
biGramas=vocabArray[1]

In [71]:
# muestra de bigramas
biGramas[0:20]

['gold coast',
 'country hour',
 'donald trump',
 'face court',
 'pleads guilty',
 'asylum seeker',
 'mental health',
 'climate change',
 'police investigate',
 'north korea',
 'police probe',
 'broken hill',
 'share market',
 'rate rise',
 'royal commission',
 'police officer',
 'plane crash',
 'body found',
 'front court',
 'govt urged']

In [74]:
vocabulario = palabras + biGramas
len(vocabulario)

36584

__ahora hacemos la transformación con la vectorización hecha para obtener TF__

In [72]:
%%time
result_cv = pipline_vectorizer_ngrams.transform(df_tokens)

CPU times: user 30.8 ms, sys: 313 µs, total: 31.1 ms
Wall time: 585 ms


In [27]:
result_cv.columns  # la vectorización se encuentra en la última columna

['headlines',
 'index',
 'text',
 'words',
 'noStopWords',
 'finalwords',
 '1_grams',
 '2_grams',
 '1_counts',
 '2_counts',
 'features_cv']

In [73]:
result_cv.take(1)

[Row(headlines='aba decides against community broadcasting licence', index=0, text='aba decides against community broadcasting licence', words=['decides', 'against', 'community', 'broadcasting', 'licence'], noStopWords='[decides, community, broadcasting, licence]', finalwords=['decides', 'community', 'broadcasting', 'licence'], 1_grams=['decides', 'community', 'broadcasting', 'licence'], 2_grams=['decides community', 'community broadcasting', 'broadcasting licence'], 1_counts=SparseVector(16599, {111: 1.0, 958: 1.0, 5213: 1.0, 8099: 1.0}), 2_counts=SparseVector(19985, {}), features_cv=SparseVector(36584, {111: 1.0, 958: 1.0, 5213: 1.0, 8099: 1.0}))]

__es el turno de obtener IDF__

In [29]:
%%time
# IDF
idf = IDF(inputCol="features_cv", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv) 

CPU times: user 14 ms, sys: 0 ns, total: 14 ms
Wall time: 1min 6s


In [75]:
result_tfidf.take(1)

[Row(headlines='aba decides against community broadcasting licence', index=0, text='aba decides against community broadcasting licence', words=['decides', 'against', 'community', 'broadcasting', 'licence'], noStopWords='[decides, community, broadcasting, licence]', finalwords=['decides', 'community', 'broadcasting', 'licence'], 1_grams=['decides', 'community', 'broadcasting', 'licence'], 2_grams=['decides community', 'community broadcasting', 'broadcasting licence'], 1_counts=SparseVector(16599, {111: 1.0, 958: 1.0, 5213: 1.0, 8099: 1.0}), 2_counts=SparseVector(19985, {}), features_cv=SparseVector(36584, {111: 1.0, 958: 1.0, 5213: 1.0, 8099: 1.0}), features=SparseVector(36584, {111: 5.2528, 958: 6.8735, 5213: 9.0088, 8099: 9.729}))]

## Train Model

In [31]:
%%time
#split 80% train set and 20% test set
df_training, df_testing = result_tfidf.randomSplit([0.8, 0.2], 1)
print('Training and testing documents: ', df_training.count(), df_testing.count())

Training and testing documents:  981475 244783
CPU times: user 16 ms, sys: 1.54 ms, total: 17.6 ms
Wall time: 3min 20s


In [32]:
%%time
num_topics=30
max_iterations=50
lda = LDA(k=num_topics, maxIter=max_iterations)
ldaModel = lda.fit(result_tfidf)

CPU times: user 49.4 ms, sys: 24.9 ms, total: 74.2 ms
Wall time: 12min


### Guardar los modelos

In [33]:
print(ldaModel.isDistributed())
path = "modelos/"

model_number = '2'
pipline_vectorizer_ngrams.save(path + 'PipelineVectorizerModel'+ model_number)    # Modelo BOW
ldaModel.save(path + 'LDAModel'+ model_number)  # Modelo entrenado
lda.save(path + 'LDA_'+ model_number)
idfModel.save(path + 'idfModel'+ model_number) 


False


# Cargar modelos

In [43]:
from pyspark.ml import PipelineModel
from pyspark.ml.clustering import LocalLDAModel
path = "modelos/"
model_number = '2'
pipline_vectorizer_ngrams = PipelineModel.load(path + 'PipelineVectorizerModel'+ model_number )   # Modelo BOW
#lda = LocalLDAModel.load(path + 'LDA_'+ model_number)
ldaModel = LocalLDAModel.load(path + 'LDAModel'+ model_number)               # Modelo entrenado

In [83]:
# Print topics and top-weighted terms
numTopics = 30  # cantidad de topicos a explorar
topics = ldaModel.describeTopics(maxTermsPerTopic=5)
ListOfIndexToWords = udf(lambda wl: list([vocabulario[w] for w in wl]))
FormatNumbers = udf(lambda nl: ["{:1.4f}".format(x) for x in nl])

toptopics = topics.select((topics.topic + 1).alias('topic'),
                          ListOfIndexToWords(topics.termIndices).alias('words'),
                          FormatNumbers(topics.termWeights).alias('weights'))
toptopics.show(truncate=False, n=numTopics)
print('Topics:', numTopics, 'Vocabulary:', len(vocabArray))



+-----+--------------------------------------------+----------------------------------------+
|topic|words                                       |weights                                 |
+-----+--------------------------------------------+----------------------------------------+
|1    |[face, court, budget, challenge, delay]     |[0.0219, 0.0164, 0.0159, 0.0111, 0.0108]|
|2    |[attack, charged, teen, hobart, assault]    |[0.0362, 0.0266, 0.0226, 0.0145, 0.0143]|
|3    |[gold, trump, coast, offer, premier]        |[0.0169, 0.0145, 0.0133, 0.0123, 0.0109]|
|4    |[island, station, title, number, highway]   |[0.0157, 0.0138, 0.0131, 0.0117, 0.0109]|
|5    |[murder, business, spark, storm, accused]   |[0.0193, 0.0121, 0.0105, 0.0096, 0.0086]|
|6    |[high, arrested, rule, threat, told]        |[0.0155, 0.0147, 0.0116, 0.0116, 0.0114]|
|7    |[violence, second, india, aust, strike]     |[0.0115, 0.0113, 0.0108, 0.0100, 0.0093]|
|8    |[election, club, scott, alice, video]       |[0.0229,

# Primeros pasos para la visualización con pyLDAVis

In [84]:
import pyLDAvis

[pista](https://stackoverflow.com/questions/41819761/pyldavis-visualization-of-pyspark-generated-lda-model)

ef prepare(topic_term_dists, doc_topic_dists, doc_lengths, vocab, term_frequency):
   """Transforms the topic model distributions and related corpus data into
   the data structures needed for the visualization.
    Parameters
    ----------
    topic_term_dists : array-like, shape (n_topics, n_terms)
        Matrix of topic-term probabilities. Where n_terms is len(vocab).
    doc_topic_dists : array-like, shape (n_docs, n_topics)
        Matrix of document-topic probabilities.
    doc_lengths : array-like, shape n_docs
        The length of each document, i.e. the number of words in each document.
        The order of the numbers should be consistent with the ordering of the
        docs in doc_topic_dists.
    vocab : array-like, shape n_terms
        List of all the words in the corpus used to train the model.
    term_frequency : array-like, shape n_terms
        The count of each particular term over the entire corpus. The ordering
        of these counts should correspond with `vocab` and topic_term_dists.



In [86]:
topic_term_dists=ldaModel.topicsMatrix().toArray()
print(type(topic_term_dists))
topic_term_dists.shape

  and should_run_async(code)


<class 'numpy.ndarray'>


(36584, 30)