# Tutorial para trabalhar com topic modeling usando a biblioteca mllib do spark para aplicações em big data
Este tutorial vai apresentar o conceito de topic modeling superficialmente, o foco dele é como utilizar este método de classificação de texto usando a biblioteca mllib do spark. Essa biblioteca, assim como o spark, vai ser introduzida em mais detalhes conforme necessário para a sua aplicação em contextos genéricos de classificação de textos usando topic modeling.

![alt text](https://www.depends-on-the-definition.com/wp-content/uploads/2018/11/IntroToLDA.png)

## O que é topic modeling e para que serve?
Como introduzido, topic modeling é um método de classificação de textos. Topic modeling define tópicos e calcula a aderência de um texto a cada um destes tópicos, o resultado da aderência a tópico é um coeficiente respectivo. 

Existem várias formas de calcular os coeficientes de aderência de um texto a um conjunto de tópicos, a forma que vai ser usada neste texto é LDA - Latent Dirichlet Allocation. LDA vai ser explicado a seguir no tutorial, ensinar a matemática por trás do algoritmo não é o propósito deste tutorial mas é importante que fique claro como o cálculo LDA é bastante custoso em termos de processamento; ele foi a escolha deste tutorial para classificação de enormes quantidades de texto por que o Spark é uma ferramenta ótima de paralelização de processamento.


---


### Latent Dirichlet... que?
Deixa o nome pra lá por enquanto, ela será chamada de 'a coisa' para que o nome não cause confusão. Vamos focar primeiro no fato de 'a coisa' ser um método de classificação de documentos por tópicos, que tem parâmetros fixos e parâmetros latentes (guarda essa palavra!) - um subconjunto dentre os parâmetros latentes é o conjunto dos tópicos, a quantidade de tópicos é arbitrária. Se pensarmos no algoritmo como algo que fornece a probabilidade de um tópico ser o certo para um dado texto, e simplificando as variáveis um pouco, podemos descrevê-lo da seguinte maneira:

$$\begin{eqnarray}
P (&topico&|&documento,&parametros fixos) = Aderencia&do&texto&ao&topico&
\end{eqnarray}$$

Agora temos um propósito claro, achar o coeficiente de aderência do texto ao topico. Para chegarmos a esse objetivo, talvez seja mais fácil pensar no inverso desta probabilidade, ao invés de tentarmos saber a probabilidade do tópico dado o texto nós vamos tentar achar a probabilidade do texto dado o tópico; é confuso mas talvez isso ajude: vamos tentar gerar o texto que temos a partir de variar o valor das nossas variáveis que medem aderência a tópico e gerando um bloco de texto com elas.


<h3>Dirichlet</h3>

Como estamos usando LDA (Latent Dirichlet Allocation), é importante entender um pouco como funciona uma Dirichlet, pelo menos para o nosso caso específico.
O que precisamos obter com Dirichlet é a probabilidade de um tópico ser o certo para um dado documento.
Vamos quebrar a explicação para facilitar o entendimento: 

1. Escolher o tópico de onde virá esta palavra neste documento

2. Escolher a palavra de dentro do tópico escolhido

Vamos lá:

1. Escolher o tópico:

  *   Para escolher o tópico preciso de: probabilidade de tópico para aquele documento ($\alpha$)
      *   Cada documento tem suas próprias probabilidades do tópico: "Tópicos do documento"

  
  
  1. 1  Escolher as probabilidades de tópico no documento:

    *   Restrições: vetor meio esparso
      *   Obter este vetor de probabilidades como uma amostra de uma distribuição de Dirichlet



2. Escolher palavra do tópico:

  *   Para escolher o tópico preciso de: probabilidades de palavra por tópico ($\beta$)


  2. 1  Escolher as probabilidades da palavra no tópico($\varphi$):

    *   Restrições: vetor meio esparso
      *   Obter este vetor de probabilidades como uma amostra de uma distribuição de Dirichlet



3. Escolhido o tópico, escolher a palavra:

![alt text](https://miro.medium.com/max/635/1*qwA4jyRFBB6Htn3X4aftSw.png)




## O que é spark e mllib?
Uma busca rápida no google já te leva a se perder no mar de termos, "*You might already know Apache Spark as a fast and general engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing*", na verdade não, se você está aqui você provavelmente não conhece spark ou mllib, então vamos nos introduzir a essas duas coisas. Spark é, afinal, uma engine de big data processing... o que quer dizer que Spark tem seu próprio método para distribuir o processamento de uma tarefa para várias maquinas com o intuito de reduzir o tempo que leva para processar conjuntos muito grandes de dados (big data). A gente precisa do spark, porque às vezes a análise de um problema necessita o processamento de dados, muitos e muitos dados, mais dados do que uma máquina sozinha seria capaz de processar. Então, posto que temos tanta informação para processar, vamos utilizar uma aplicação distribuida - spark - que consiga dividir esse bloco de trabalho enorme em bloquinhos menores e fazer máquinas diferentes processarem cada bloquinho para depois obter o mesmo resultado em menos tempo.

![alt text](https://dzone.com/storage/temp/9507196-data-flow2x-768x897.png)

Mllib é a biblioteca de machine learning do spark, ela vai permitir o uso LDA de forma otimizada para o processamento distribuido que o spark oferece. No tutorial, nós vamos aplicar ela localmente, mas a transição de local para em um cluster na AWS é extremamente fácil e vai ser mostrada aqui também.

## Por que usar eles ao invés de scikit-learn ou outra biblioteca de machine learning?

Vamos responder esta com uma atividade, no final deste tutorial, onde você vai tentar fazer algo parecido com o que estamos prestes a fazer com o spark e mllib, mas com uma biblioteca não distribuida - o scikit-learn; o objetivo desta atividade é mostrar que para volumes massivos de informação, é muito mais adequado utilizar uma aplicação distribuida como o spark, uma vez que o tempo para processar essa quantidade de dados em uma máquina se torna inadequado.

## Seria prudente ter uma noção sólida dos seguintes conceitos para passar deste ponto:

*   Programação em Python
*   Topic modeling (nada demais, só ter certeza que o conteúdo do tutorial até agora ficou claro)

## Para começar, você vai precisar ter acesso a:

*   Uma máquina Ubuntu (O tutorial não foi testado em outros OS's)

#### E nessa maquina, vai ser necessário ter instalado:
*   Python
*   Pip
*   Java (recomendamos a versão 8 do jdk)


## Como instalar algumas partes que talvez você não tenha
Supondo que você não tenha já o pyspark e o findspark, segue como instalar eles - para usar spark pelo python:

In [1]:
!pip install -q pyspark
!pip install -q findspark

## Antes de começar, vamos importar tudo que vamos usar:

In [2]:
import findspark
import os
import pyspark
import string

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext
from pyspark.mllib.util import MLUtils
from pyspark.sql.types import *
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors

Uau, parece um monte de coisa, e é! Tudo isso é necessario para que seja possivel o uso do spark via python - e no findspark init, você talvez tenha um diretorio diferente onde você instalou o spark e é esse repositorio que devia vir aqui!

## Começando, vamos iniciar uma seção spark e ajustar tudo agora que temos as bibliotecas importadas

Essa é a porta de entrada para desenvolvimento em Spark, aqui a gente inicia a seção Spark em cima da qual todo o resto da aplicação vai rodar

In [3]:
sc = pyspark.SparkContext(appName = "LDA_app")

sqlContext = SQLContext(sc)

sqlContext = SQLContext(sc)

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Começando, vamos obter dados
Nós vamos usar dados deste db, contendo as letras de musicas:

https://www.kaggle.com/mousehead/songlyrics/download

(~22MB)

Com os dados em mãos, precisamos alimentar eles ao Spark, da seguinte forma:

In [4]:
dataset_location = 'songdata.csv'

# Ler os dados do arquivo 
data_df = spark.read.csv(dataset_location,header=True, multiLine=True,sep=",");

# Excluir as linhas nulas, que não contem dados
data_df = data_df.na.drop()

Nessa etapa, nós estamos pegando os dados usando a spark.read.csv, os parametros header e multiline indicam que a primeira linha de dados é na verdade a linha que da nome às colunas e que a informação da tabela pode ter quebras de linha, famosos "\n", respectivamente. Nós utilizamos tambem o parametro sep, para indicar que a informação ta separada por uma virgula no arquivo .csv.

## Dados obtidos, vamos agora 
Tokenizar os dados. Mais uma palavra chave, tokenizar, que nada mais significa do que aglutinar tudo aquilo que é parecido - como estamos trabalhando com letras de musicas, tem muita abreviação, palavras escritas de forma estilizada e coisas do tipo, mas só nos interessa uma variação destas, contanto que ela tenha significado sintatico diferente das outras palavras. Por exemplo, "feel", "feeling", "feels" vão se tornar uma palavra só.

In [5]:
# Tokenizando as palavras da coluna texto e 
# adicionando uma coluna palavras para guardar essa informação
tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsDataFrame = tokenizer.transform(data_df)

## Agora dando uma limpada nos dados
Vamos tirar as 20 palavras mais recorrentes, as que tem caracteres numericos e as com menos de 3 caracteres, para isso, vai ser necessario executarmos alguns passos

In [6]:
# Essas duas linhas agrupam as palavras que nós não queremos
cv_tmp = CountVectorizer(inputCol="words", outputCol="tmp_vectors")
cv_tmp_model = cv_tmp.fit(wordsDataFrame)

# Aqui vão ficar as nossas 20 palavras mais fraquentes
top20 = list(cv_tmp_model.vocabulary[0:20])

# Aqui vao ficar as que tem menos de 3 caracteres
more_then_3_charachters = [word for word in cv_tmp_model.vocabulary if len(word) <= 3]

# Aqui vão ficar as que tem digitos numericos
contains_digits = [word for word in cv_tmp_model.vocabulary if any(char.isdigit() for char in word)]

#
# Voce pode adicionar palavras que deseja filtrar nesta lista!
#
stopwords = [] 

# Juntando as 3 listas de coisas que não queremos
stopwords = stopwords + top20 + more_then_3_charachters + contains_digits

# Podemos remover as palavras que não queremos, finalmente
remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords = stopwords)
wordsDataFrame = remover.transform(wordsDataFrame)

## Dando uns passos para trás

Vamos repetir o processo de agrupar as palavras, agora que filtramos as que não eram de interesse

In [7]:
#Create a new CountVectorizer model without the stopwords
Vector = CountVectorizer(inputCol="filtered", outputCol="vectors")
model = Vector.fit(wordsDataFrame)
result = model.transform(wordsDataFrame)

## Alguem falou burocracia??
É, até no codigo tem burocracia... Precisamos formatar a informação que temos para ela ser palatavel para o mllib

In [9]:
# Adicionando um id unico para cada item (a funcao não faz isso automaticamente, mas como só vamos fazer uma vez, sabemos que será unico)
# Alem disso, estamos tambem pegando só a informação que queremos, e com a ajuda da funcao, temos ela na ordem [id's, vetores]
sparsevector = result.withColumn("id", monotonically_increasing_id()).select('id', 'vectors')

# Aqui convertemos o que temos para algo que o mllib consegue usar
sparsevector = MLUtils.convertMatrixColumnsFromML(sparsevector)

corpus = sparsevector.select("id", "vectors").rdd.map(lambda x: [x[0], Vectors.fromML(x[1])]).cache()# , lambda y: Vectors.fromML(y)

## Cream of the crop
Aqui a gente treina nosso modelo LDA! Poderiamos ter escolhido varios parametros, mas deixamos ele nos defaults que são aceitaveis nesse caso

In [10]:
ldaModel = LDA.train(corpus)

Finalmente, os resultados!!

In [11]:
topics = ldaModel.describeTopics(maxTermsPerTopic = 15)
for x, topic in enumerate(topics):
    print ('topic nr: ' + str(x))
    words = topic[0]
    weights = topic[1]
    for n in range(len(words)):
        print (model.vocabulary[words[n]] + ' ' + str(weights[n]))

topic nr: 0
don't 0.0113232563906
like 0.0102195759537
down 0.0100079298242
with 0.00919105864446
yeah 0.00861946739072
just 0.00846448928641
back 0.00843188729723
know 0.00793055364
come 0.00785361222616
road 0.0078441810019
it's 0.00771851648569
when 0.00768491664197
ain't 0.00756118637059
gonna 0.00727239154219
what 0.00674568699061
topic nr: 1
don't 0.013463795635
it's 0.0110282608979
just 0.0107034781696
know 0.0101012005968
this 0.0100083645252
with 0.00993350480972
when 0.00956380199466
take 0.00899438713952
make 0.00884034921256
right 0.00868516382029
want 0.00866366173536
you're 0.00851897121901
like 0.0084817593158
what 0.00806905057977
find 0.00787973784165
topic nr: 2
know 0.0136316160377
will 0.010677639695
don't 0.0104866605117
this 0.0102288705887
when 0.0102004470962
what 0.00995430320224
just 0.00984628230795
it's 0.00938644639121
with 0.00862754451543
i'll 0.00831802445141
time 0.00711969661268
can't 0.00688587529805
never 0.00686974864572
you're 0.00663386988214
like