In [2]:
import findspark
findspark.init('/home/diegomarques/spark-2.3.0-bin-hadoop2.7')

from os import listdir
from os.path import isfile, join

In [3]:
from pyspark.sql.types import *

In [4]:
import numpy as np
import unicodedata

In [5]:
from pyspark.sql.functions import * 

In [6]:
from __future__ import print_function

In [7]:
path = '/home/diegomarques/Documents/Mestrado/Data/mes'
input_files = [f for f in listdir(path) if isfile(join(path, f))]

In [8]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local', 'LDA Mes')
spark = SparkSession(sc)

In [9]:
data = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', delimiter=';').load(path)

In [18]:
data.printSchema()

root
 |-- CodNot: integer (nullable = true)
 |-- ComNot: string (nullable = true)
 |-- edtNome: string (nullable = true)



In [None]:
data.count()

In [None]:
data.head(1)

In [11]:
data = data.sample(False, 0.10, 1234)

In [12]:
data.count()

60886

In [None]:
# data = data.drop('edtNome')
# data.printSchema()

In [13]:
data = data.na.drop(subset=['CodNot', 'ComNot', 'edtNome'])

In [14]:
data = data.withColumn('ComNot', lower(data.ComNot))
data = data.withColumn("ComNot", trim(data.ComNot))
data = data.withColumn("edtNome", trim(data.edtNome))
data = data.withColumn('ComNot', regexp_replace('ComNot', '<.*?>', ''))
data = data.withColumn('ComNot', regexp_replace('ComNot','\s\s', ''))

In [15]:
def remove_accents_(inputStr):
    # first, normalize strings:
    nfkdStr = unicodedata.normalize('NFKD', inputStr)
    # Keep chars that has no other char combined (i.e. accents chars)
    withOutAccents = u"".join([c for c in nfkdStr if not unicodedata.combining(c)])
    return withOutAccents

remove_accents = udf(lambda c: remove_accents_(c) if c != None else c, StringType())

In [16]:
data = data.withColumn('ComNot', remove_accents(data.ComNot))
data = data.withColumn('ComNot', regexp_replace('ComNot','[^\sça-zA-Z|^-|^_]', ''))
data = data.dropDuplicates(subset=['ComNot'])

In [14]:
data.head(10)

[Row(CodNot=49204549, ComNot='    e  de maio de  | sexta e sabado h e domingo h em pleno tempo de homenagens a tantos icones da historia artisticocultural de toda ordem surgiu a ideia de se falar sobre um grande heroi francisco de assis tudo com muito humor como e caracteristico na trajetoria de sucesso  de carlos nunes nos palcos mineiros isso alem da riqueza historica e uma boa dose de poesia ele divide o palco com o ator andre mauricio com direcao do proprio carlos nunes e texto de marcio ares o espetaculo traduz a imagem nao so do santo mas do homem genial que foi francisco da sua infancia ate a sua morte tomase dele a humanidade com que enfrentou o seu desafio de mais amar que ser amado a ideia e fazer rir sem ofender e divertir sem blasfemar sem ferir a aura de santo que lhe e peculiar o espetaculo mostra sob uma nova e divertida perspectiva a sua fraterna existencia sobre a terra explica o ator a italia do seculo xiii onde e quando viveu francisco vai se estender ate os tempos a

In [None]:
data.count()

In [17]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, RegexTokenizer

In [18]:
tk_model = RegexTokenizer(minTokenLength=2, inputCol="ComNot", outputCol="tkComNot", pattern="\\W")
data = tk_model.transform(data)

In [None]:
data.select("tkComNot").head(1)

In [None]:
stopwords_with_accents = ['de', 'a', 'o', 'que', 'e', 'do', 'da', 'em', 'um', 'para', 'com', 'não', 'uma', 'os', 'no', 'se', 'na', 'por',
             'mais', 'as', 'dos', 'como', 'mas', 'foi', 'ao', 'ele', 'das', 'tem', 'à', 'seu', 'sua', 'ou', 'ser', 'quando',
             'muito', 'há', 'nos', 'já', 'está', 'eu', 'também', 'só', 'pelo', 'pela', 'até', 'isso', 'ela', 'entre', 'era',
             'depois', 'sem', 'mesmo', 'aos', 'ter', 'seus', 'quem', 'nas', 'me', 'esse', 'eles', 'estão', 'você', 'tinha',
             'foram', 'essa', 'num', 'nem', 'suas', 'meu', 'às', 'minha', 'têm', 'numa', 'pelos', 'elas', 'havia', 'seja',
             'qual', 'será', 'nós', 'tenho', 'lhe', 'deles', 'essas', 'esses', 'pelas', 'este', 'fosse', 'dele', 'tu',
             'te', 'vocês', 'vos', 'lhes', 'meus', 'minhas', 'teu', 'tua', 'teus', 'tuas', 'nosso', 'nossa', 'nossos',
             'nossas', 'dela', 'delas', 'esta', 'estes', 'estas', 'aquele', 'aquela', 'aqueles', 'aquelas', 'isto', 'aquilo,
             'estou', 'está', 'estamos', 'estão', 'estive', 'esteve', 'estivemos', 'estiveram', 'estava', 'estávamos',
             'estavam', 'estivera', 'estivéramos', 'esteja', 'estejamos', 'estejam', 'estivesse', 'estivéssemos',
             'estivessem', 'estiver', 'estivermos', 'estiverem', 'hei', 'há', 'havemos', 'hão', 'houve', 'houvemos',
             'houveram', 'houvera', 'houvéramos', 'haja', 'hajamos', 'hajam', 'houvesse', 'houvéssemos', 'houvessem',
             'houver', 'houvermos', 'houverem', 'houverei', 'houverá', 'houveremos', 'houverão', 'houveria', 'houveríamos',
             'houveriam', 'sou', 'somos', 'são', 'era', 'éramos', 'eram', 'fui', 'foi', 'fomos', 'foram', 'fora', 'fôramos',
             'seja', 'sejamos', 'sejam', 'fosse', 'fôssemos', 'fossem', 'for', 'formos', 'forem', 'serei', 'será',
             'seremos', 'serão', 'seria', 'seríamos', 'seriam', 'tenho', 'tem', 'temos', 'tém', 'tinha', 'tínhamos',
             'tinham', 'tive', 'teve', 'tivemos', 'tiveram', 'tivera', 'tivéramos', 'tenha', 'tenhamos', 'tenham',
             'tivesse', 'tivéssemos', 'tivessem', 'tiver', 'tivermos', 'tiverem', 'terei', 'terá', 'teremos', 'terão',
             'teria', 'teríamos', 'teriam']

In [19]:
stopwords_without_accents = ['de', 'a', 'o', 'que', 'e', 'do', 'da', 'em', 'um', 'para', 'com', 'nao', 'uma', 'os', 'no', 'se', 'na', 'por',
             'mais', 'as', 'dos', 'como', 'mas', 'foi', 'ao', 'ele', 'das', 'tem', 'seu', 'sua', 'ou', 'ser', 'quando',
             'muito', 'ha', 'nos', 'ja', 'esta', 'eu', 'também', 'so', 'pelo', 'pela', 'ate', 'isso', 'ela', 'entre', 'era',
             'depois', 'sem', 'mesmo', 'aos', 'ter', 'seus', 'quem', 'nas', 'me', 'esse', 'eles', 'estao', 'voce', 'tinha',
             'foram', 'essa', 'num', 'nem', 'suas', 'meu', 'minha', 'tem', 'numa', 'pelos', 'elas', 'havia', 'seja',
             'qual', 'sera', 'nos', 'tenho', 'lhe', 'deles', 'essas', 'esses', 'pelas', 'este', 'fosse', 'dele', 'tu',
             'te', 'voces', 'vos', 'lhes', 'meus', 'minhas', 'teu', 'tua', 'teus', 'tuas', 'nosso', 'nossa', 'nossos',
             'nossas', 'dela', 'delas', 'esta', 'estes', 'estas', 'aquele', 'aquela', 'aqueles', 'aquelas', 'isto', 'aquilo',
             'estou', 'estamos', 'estao', 'estive', 'esteve', 'estivemos', 'estiveram', 'estava', 'estavamos',
             'estavam', 'estivera', 'estiveramos', 'esteja', 'estejamos', 'estejam', 'estivesse', 'estivessemos',
             'estivessem', 'estiver', 'estivermos', 'estiverem', 'hei', 'ha', 'havemos', 'hao', 'houve', 'houvemos',
             'houveram', 'houvera', 'houveramos', 'haja', 'hajamos', 'hajam', 'houvesse', 'houvessemos', 'houvessem',
             'houver', 'houvermos', 'houverem', 'houverei', 'houvera', 'houveremos', 'houverao', 'houveria', 'houveríamos',
             'houveriam', 'sou', 'somos', 'sao', 'era', 'eramos', 'eram', 'fui', 'foi', 'fomos', 'foram', 'fora', 'foramos',
             'seja', 'sejamos', 'sejam', 'fosse', 'fossemos', 'fossem', 'for', 'formos', 'forem', 'serei', 'sera',
             'seremos', 'serao', 'seria', 'seriamos', 'seriam', 'tenho', 'tem', 'temos', 'tem', 'tinha', 'tinhamos',
             'tinham', 'tive', 'teve', 'tivemos', 'tiveram', 'tivera', 'tiveramos', 'tenha', 'tenhamos', 'tenham',
             'tivesse', 'tivessemos', 'tivessem', 'tiver', 'tivermos', 'tiverem', 'terei', 'tera', 'teremos', 'terao',
             'teria', 'teriamos', 'teriam']

#https://gist.githubusercontent.com/alopes/5358189/raw/2107d809cca6b83ce3d8e04dbd9463283025284f/stopwords.txt

In [20]:
StopWordsRemover.loadDefaultStopWords('portuguese')
remover = StopWordsRemover(inputCol="tkComNot", outputCol="filteredTKComNot", stopWords=stopwords_without_accents)
data = remover.transform(data)

In [21]:
data = data.na.drop(subset=['filteredTKComNot'])
#remove_empty_tokens = udf(lambda c: list(filter(None, c)), StringType()))
#data = data.withColumn('filteredTKComNot', remove_empty_tokens(data.filteredTKComNot))
#data.where(col('tkComNot').isNull()).show()

In [20]:
data.select("filteredTKComNot").head(1)

[Row(filteredTKComNot=['unidades', 'regionais', 'saude', 'bairros', 'cristovao', 'boa', 'vista', 'reformadas', 'atendimento', 'usuarios', 'alterado', 'partir', 'desta', 'quartafeira', 'atendimento', 'antiga', 'urs', 'nidia', 'modesto', 'veludo', 'praca', 'augusto', 'lemp', 'lado', 'farmacia', 'solidaria', 'atendimento', 'local', 'provisorio', 'enquanto', 'predios', 'unidades', 'passam', 'reforma', 'visando', 'regularizacao', 'acordo', 'normas', 'vigilancia', 'sanitaria', 'conforme', 'projeto', 'reforma', 'unidades', 'boa', 'vista', 'cristovao', 'trocados', 'toda', 'estrutura', 'eletrica', 'telhado', 'central', 'material', 'esterilizacao', 'projeto', 'preve', 'ainda', 'entrada', 'luz', 'natural', 'unidades', 'valor', 'investimentos', 'unidades', 'urs', 'boa', 'vista', 'urs', 'cristovao', 'segundo', 'chefe', 'departamento', 'atencao', 'especializada', 'sheron', 'hellen', 'silva', 'prazo', 'obra', 'meses', 'reforma', 'acontecendo', 'melhorar', 'estrutura', 'unidades', 'usuario', 'trabalha

In [None]:
# stem words ???

In [22]:
from pyspark.ml.feature import CountVectorizer
cv_model = CountVectorizer(inputCol="filteredTKComNot", outputCol="TFfilteredTKComNot")

In [26]:
vocabulary_size = cv_model.getVocabSize()
vocabulary_size

262144

In [23]:
# Option 1
cv_model_fit = cv_model.fit(data)
data = cv_model_fit.transform(data)

In [28]:
vocab = cv_model_fit.vocabulary
vocab_broadcast = sc.broadcast(vocab)

In [None]:
vocab

In [None]:
# Option 2
#hash_tf = HashingTF(inputCol="filteredTKComNot", outputCol="TFfilteredTKComNot", numFeatures=vocabulary_size)
#df_01_2017 = hash_tf.transform(df_01_2017)

In [24]:
idf = IDF(inputCol="TFfilteredTKComNot", outputCol="IDFfilteredTKComNot", minDocFreq=5)
idf_model = idf.fit(data)
data = idf_model.transform(data)

In [25]:
# Split data 0,7 0,3
splits = data.randomSplit([0.7, 0.3], seed=1234)
train = splits[0]
test = splits[1]

In [None]:
train.count()

In [None]:
test.count()

In [26]:
# LDA
# https://github.com/zaratsian/Spark/blob/master/text_analytics_datadriven_topics.py
# https://towardsdatascience.com/apache-spark-hashing-or-dictionary-d23c0e046a19
# https://www.3pillarglobal.com/insights/topic-clusters-tf-idf-vectorization-using-apache-spark?cn-reloaded=1
# http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=minhash%20lsh
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.jaccard.html
from pyspark.ml.linalg import Vectors, SparseVector, DenseVector
from pyspark.ml.clustering import LDA, DistributedLDAModel, LocalLDAModel

In [27]:
lda = LDA(k=10, seed=1234, featuresCol="IDFfilteredTKComNot", optimizer="em")
lda_model = lda.fit(train)

In [33]:
local_model = lda_model.toLocal()

In [28]:
# Store model
lda_path = "/home/diegomarques/Documents/Mestrado/lda_path/"

#lda.save(lda_path + "lda")
#same_lda = LDA.load(lda_path + "lda")

In [30]:
lda_model.save(lda_path + "lda_model_distributed_sample")
distributed_lda_model = DistributedLDAModel.load(lda_path + "lda_model_distributed_sample")

In [47]:
#local_model.save(lda_path + "lda_model_local")
#same_local_model = LocalLDAModel.load(lda_path + "lda_model_local")

In [31]:
#data = sc.read.load(path + "/spark_df/" + "mes_df")
data.write.save(path + "/spark_df/" + "mes_df_sample_10")

In [16]:
#ldatopics = lda_model.describeTopics()
ldatopics = distributed_lda_model.describeTopics()

In [17]:
ldatopics.show(3)

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[87, 40, 8, 61, 1...|[0.00130828987925...|
|    1|[268, 469, 1033, ...|[0.00205568631659...|
|    2|[33, 94, 127, 331...|[0.01258220107946...|
+-----+--------------------+--------------------+
only showing top 3 rows



In [32]:
def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))

In [53]:
ldatopics_mapped = ldatopics.withColumn("topic", udf_map_termID_to_Word(ldatopics.termIndices))

In [55]:
ldatopics_mapped.select(ldatopics_mapped.topic).show(10, False)

+----------------------------------------------------------------------------------------------------+
|topic                                                                                               |
+----------------------------------------------------------------------------------------------------+
|[tribunal, saude, governo, justica, federal, estado, lei, lula, processo, deputado]                 |
|[copa, selecao, israel, contra, jogo, jogadores, mundo, russia, time, clube]                        |
|[la, el, en, los, del, un, con, las, una, su]                                                       |
|[mercado, cento, indice, alta, shopping, trimestre, dolar, milhoes, brasil, dub]                    |
|[saude, foro, jogo, contra, pontos, pessoas, vitoria, time, dia, casos]                             |
|[governo, caminhoneiros, diesel, preco, bilhoes, reducao, ano, credito, milhoes, petrobras]         |
|[evento, festival, anos, local, cidade, pessoas, policia, brasil, tambem

In [None]:
data.show(1, False)

In [33]:
#data_ldatopics_mapped = lda_model.transform(data.select('CodNot', 'ComNot', 'edtNome'))
data_ldatopics_mapped = distributed_lda_model.transform(train.select('CodNot', 'ComNot', 'edtNome', 'IDFfilteredTKComNot'))

In [37]:
data_ldatopics_mapped.take(10)

[Row(CodNot=49204549, ComNot='    e  de maio de  | sexta e sabado h e domingo h em pleno tempo de homenagens a tantos icones da historia artisticocultural de toda ordem surgiu a ideia de se falar sobre um grande heroi francisco de assis tudo com muito humor como e caracteristico na trajetoria de sucesso  de carlos nunes nos palcos mineiros isso alem da riqueza historica e uma boa dose de poesia ele divide o palco com o ator andre mauricio com direcao do proprio carlos nunes e texto de marcio ares o espetaculo traduz a imagem nao so do santo mas do homem genial que foi francisco da sua infancia ate a sua morte tomase dele a humanidade com que enfrentou o seu desafio de mais amar que ser amado a ideia e fazer rir sem ofender e divertir sem blasfemar sem ferir a aura de santo que lhe e peculiar o espetaculo mostra sob uma nova e divertida perspectiva a sua fraterna existencia sobre a terra explica o ator a italia do seculo xiii onde e quando viveu francisco vai se estender ate os tempos a

In [34]:
import numpy as np
import pandas as pd

In [35]:
data_td_as_arr = data_ldatopics_mapped.select('CodNot', 'topicDistribution')

In [38]:
data_td_as_arr = data_td_as_arr.select('topicDistribution').rdd.map(lambda v: (v,)).toDF()

In [39]:
data_td_as_arr = data_td_as_arr.select(col('_1').alias('topicDistributionArray'))
data_td_as_arr.show(1)

+----------------------+
|topicDistributionArray|
+----------------------+
|  [[0.0100520314523...|
+----------------------+
only showing top 1 row



In [40]:
df_topdist_pd = data_td_as_arr.toPandas()

In [42]:
df_ldatopics_mapped_pd = data_ldatopics_mapped.select('CodNot').toPandas()

In [57]:
df_topdist_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 351910 entries, 0 to 351909
Data columns (total 1 columns):
topicDistributionArray    351910 non-null object
dtypes: object(1)
memory usage: 2.7+ MB


In [43]:
s_topdist_pd = df_topdist_pd['topicDistributionArray'].apply(lambda x : np.array(x))

In [44]:
s_topdist_pd_1d = s_topdist_pd.apply(lambda x : x[0].tolist())

In [46]:
len(s_topdist_pd_1d)

40749

In [49]:
doc_topic_dist = np.array([dist for dist in s_topdist_pd_1d])

In [50]:
doc_topic_dist.shape

(40749, 10)

In [52]:
s_topdist_pd_1d[0]

[0.010052030551846586,
 0.5116306687664162,
 0.01966189621965006,
 0.06561168663255888,
 0.008128692410695245,
 0.0235805545580835,
 0.322005697016139,
 0.0135403822563051,
 0.014270356240455495,
 0.01151803534784975]

In [54]:
doc_topic_dist[0][0]

0.010052030551846586

In [55]:
def jensen_shannon(query, matrix):
    """
    This function implements a Jensen-Shannon similarity
    between the input query (an LDA topic distribution for a document)
    and the entire corpus of topic distributions.
    It returns an array of length M where M is the number of documents in the corpus
    """
    # lets keep with the p,q notation above
    p = query[None,:].T # take transpose
    q = matrix.T # transpose matrix
    m = 0.5*(p + q)
    return np.sqrt(0.5*(entropy(p,m) + entropy(q,m)))

In [56]:
def get_most_similar_documents(query,matrix,k=10):
    """
    This function implements the Jensen-Shannon distance above
    and retruns the top k indices of the smallest jensen shannon distances
    """
    sims = jensen_shannon(query,matrix) # list of jensen shannon distances
    return sims.argsort()[:k] # the top k positional index of the smallest Jensen Shannon distances

In [58]:
test_ldatopics_mapped = distributed_lda_model.transform(test.select('CodNot', 'ComNot', 'edtNome', 'IDFfilteredTKComNot'))

In [59]:
test_ldatopics_mapped.show(1)

+--------+--------------------+--------------+--------------------+--------------------+
|  CodNot|              ComNot|       edtNome| IDFfilteredTKComNot|   topicDistribution|
+--------+--------------------+--------------+--------------------+--------------------+
|48409741|contestado pela m...|ESPAÇO PÚBLICO|(262144,[1,3,7,8,...|[0.17360439508453...|
+--------+--------------------+--------------+--------------------+--------------------+
only showing top 1 row



In [60]:
test_td_as_arr = test_ldatopics_mapped.select('topicDistribution').rdd.map(lambda v: (v,)).toDF(['topicDistributionArray'])
test_df_topdist_pd = test_td_as_arr.toPandas()
test_s_topdist_pd = test_df_topdist_pd['topicDistributionArray'].apply(lambda x : np.array(x))
test_s_topdist_pd_1d = test_s_topdist_pd.apply(lambda x : x[0].tolist())
test_doc_topic_dist = np.array([dist for dist in test_s_topdist_pd_1d])
test_doc_topic_dist.shape

(17089, 10)

In [61]:
test_doc_topic_dist[0]

array([0.17360453, 0.08760329, 0.04224785, 0.07922117, 0.03858348,
       0.2096015 , 0.05179184, 0.03945537, 0.22744176, 0.05044919])

In [63]:
from scipy.stats import entropy

In [64]:
most_sim_ids = get_most_similar_documents(test_doc_topic_dist[0],doc_topic_dist)

  pk = 1.0*pk / np.sum(pk, axis=0)


In [69]:
most_similar_df = s_topdist_pd_1d[s_topdist_pd_1d.index.isin(most_sim_ids)]

In [70]:
most_similar_df

2407     [0.15996437963495355, 0.12131099995005516, 0.0...
6955     [0.15870438194950742, 0.05598691165747272, 0.0...
11320    [0.19564898132386366, 0.133561075874659, 0.065...
11861    [0.21346027032688789, 0.08515615970557563, 0.0...
18074    [0.19581382248155207, 0.048506229101834476, 0....
19931    [0.24564859569345351, 0.06647258757557879, 0.0...
27795    [0.2017847572894229, 0.0729911738779661, 0.046...
28082    [0.20267032437007843, 0.051720040300635865, 0....
28336    [0.15400472240635604, 0.10532362971065377, 0.0...
30707    [0.10974550599607732, 0.07713796187050347, 0.0...
Name: topicDistributionArray, dtype: object

In [80]:
data_ldatopics_mapped_with_idx = data_ldatopics_mapped.select(
    'CodNot', 'ComNot', 'edtNome').rdd.zipWithIndex().toDF(['conteudo'])
data_ldatopics_mapped_with_idx = data_ldatopics_mapped_with_idx.select('conteudo', col('_2').alias('idx'))

In [82]:
data_ldatopics_mapped_with_idx.show(1)

+--------------------+---+
|            conteudo|idx|
+--------------------+---+
|[48417179, dezena...|  0|
+--------------------+---+
only showing top 1 row



In [150]:
most_sim_ids_arr = sorted(most_sim_ids.tolist())
most_sim_ids_arr

[2407, 6955, 11320, 11861, 18074, 19931, 27795, 28082, 28336, 30707]

In [151]:
data_in_train_similar = data_ldatopics_mapped_with_idx.filter(col('idx').isin(most_sim_ids_arr) == True)

In [152]:
row_to_CodNot = udf(lambda x: x[0])
row_to_ComNot = udf(lambda x: x[1])
row_to_edtNome = udf(lambda x: x[2])
data_in_train_similar = data_in_train_similar.withColumn('CodNot', lit(None))
data_in_train_similar = data_in_train_similar.withColumn('ComNot', lit(None))
data_in_train_similar = data_in_train_similar.withColumn('edtNome', lit(None))
data_in_train_similar.printSchema()

root
 |-- conteudo: struct (nullable = true)
 |    |-- CodNot: long (nullable = true)
 |    |-- ComNot: string (nullable = true)
 |    |-- edtNome: string (nullable = true)
 |-- idx: long (nullable = true)
 |-- CodNot: null (nullable = true)
 |-- ComNot: null (nullable = true)
 |-- edtNome: null (nullable = true)



In [153]:
data_in_train_similar = data_in_train_similar.withColumn('CodNot', row_to_CodNot(test.conteudo))
data_in_train_similar = data_in_train_similar.withColumn('ComNot', row_to_ComNot(test.conteudo))
data_in_train_similar = data_in_train_similar.withColumn('edtNome', row_to_edtNome(test.conteudo))
data_in_train_similar = data_in_train_similar.select('idx', 'CodNot', 'ComNot', 'edtNome')
data_in_train_similar.printSchema()

root
 |-- idx: long (nullable = true)
 |-- CodNot: string (nullable = true)
 |-- ComNot: string (nullable = true)
 |-- edtNome: string (nullable = true)



In [135]:
print('To search')
test_ldatopics_mapped.select('CodNot', 'ComNot').head(1)

To search


[Row(CodNot=48409741, ComNot='contestado pela maiorpartedos moto ristas sobretudo por conta do preco sdas tarifas o pedagio cobrado em trechos concessionados da regido dos cam pos gerais e revertido em mehorias e parte do dinheiro arrecadado e destinado excusi vamente para o servia de socorronumeros disponibilizados pela rodo norte comprovam a eficiencia do servi co de socorro nesses  anos de ativida des nas rr s i  e  e tia pr  as equipes da concessionaria atende ram mais de i milhdo de ocorrencias sendo mais de r mil veiculos atendidos ocorrencias mecanicas e mais de  mi situacoes de atendimento pre hospitalar iaph so no ano jxissado foram mais de  mi ocorrenciasatenddas nos ir quilometros cuidados pela companhia noparana alem do volume de ocorrencias atendidas nestes  anos outra marca ex pressiva esta na reducao de lo no indi ce de obitos e  no indice de veiculos se compararmos os anos de  ultimo ano sem o programa de concessoes no pa rana e o pedagio existe porque o htado nao tinha

In [155]:
print('Similar')
data_in_train_similar.select('CodNot', 'ComNot').head(1)

Similar


[Row(CodNot='48874289', ComNot='a nova fase da atuacao das forcas de seguranca na regiao preve a retomada de areas que incluem as comunidade da vila kennedy e do batan a nota divugada pelo gabinete de intervencao federal informa que os projetos sociais ate entao conduzidos pelas upps serao mantidos pelo o batalhao da policia militar a partir desta quartafeira')]

In [158]:
data_in_train_similar.filter(col('idx') == most_sim_ids_arr[1]).head()

Row(idx=6955, CodNot='49334974', ComNot='a feira de gado de caruaru no agreste de pernambuco que e realizada habitualmente as tercasfeiras das h as h esta acontecendo normalmente nesta tercafeira  apesar da greve dos caminhoneiros a feira que recebe milhares de pessoas durante a semana registra poucos comerciantes e compradores nesta terca matadouro de acordo com a secretaria de servicos publicos da cidade o matadouro publico tambem esta funcionando normalmente nesta tercafeira', edtNome='INTERIOR')

In [142]:
data_in_train_similar.count()

10

In [None]:
#test_df_ldatopics_mapped_pd = test_ldatopics_mapped.select('CodNot').toPandas()

In [None]:
#ll = lda_model.logLikelihood(data)
#print("The lower bound on the log likelihood of the entire corpus: " + str(ll))

In [None]:
#lp = lda_model.logPerplexity(df_01_2017)
#print("The upper bound on perplexity: " + str(lp))