# TD-IDF - Código para medir importância de uma palavra num documento

Importar subset da Wikipedia que está em bucket S3

In [48]:
rawdata = spark.read.options(sep="\t").csv("s3://aws-emr-studio-904233096976-us-east-2/1757979133869/subset-small.tsv")
rawdata.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+
|_c0|                 _c1|                _c2|                 _c3|
+---+--------------------+-------------------+--------------------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|
|303|             Alabama|2008-12-29 08:15:47|Alabama (formally...|
|305|            Achilles|2008-12-30 06:18:01|thumb\n\nIn Greek...|
|307|     Abraham Lincoln|2008-12-28 20:18:23|Abraham Lincoln (...|
|308|           Aristotle|2008-12-29 23:54:48|Aristotle (Greek:...|
|309|An American in Paris|2008-09-27 19:29:28|An American in Pa...|
|324|       Academy Award|2008-12-28 17:50:43|The Academy Award...|
|330|             Actrius|2008-05-23 15:24:32|Actrius (Actresse...|
|332|     Animalia (book)|2008-12-18 11:12:34|th

Como o dataset não tem nomes das colunas definidos, faremos manualmente:

In [49]:
articles = rawdata.toDF("ID", "Title", "Time", "Document")
articles.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+
| ID|               Title|               Time|            Document|
+---+--------------------+-------------------+--------------------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|
|303|             Alabama|2008-12-29 08:15:47|Alabama (formally...|
|305|            Achilles|2008-12-30 06:18:01|thumb\n\nIn Greek...|
|307|     Abraham Lincoln|2008-12-28 20:18:23|Abraham Lincoln (...|
|308|           Aristotle|2008-12-29 23:54:48|Aristotle (Greek:...|
|309|An American in Paris|2008-09-27 19:29:28|An American in Pa...|
|324|       Academy Award|2008-12-28 17:50:43|The Academy Award...|
|330|             Actrius|2008-05-23 15:24:32|Actrius (Actresse...|
|332|     Animalia (book)|2008-12-18 11:12:34|th

Agora, faremos a limpeza de dados. Primeiro, checar se existem documentos nulos.

In [50]:
articles.filter(articles.Document.isNull()).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+-------------------+--------+
|  ID|               Title|               Time|Document|
+----+--------------------+-------------------+--------+
|1109|Geography of Amer...|2008-09-29 11:21:06|    null|
+----+--------------------+-------------------+--------+

Temos um documento nulo, iremos apagá-lo.

In [51]:
cleanedArticles = articles.filter(articles.Document.isNotNull())
cleanedArticles.filter(articles.Document.isNull()).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+----+--------+
| ID|Title|Time|Document|
+---+-----+----+--------+
+---+-----+----+--------+

Como o TF-IDF trabalha em cima de números e não palavras, precisamos pré-processar (tokenizar) dados antes de rodar algoritmos. Primeiro, tokenizar documentos separando por palavras, depois armazenando em um vetor que contém a representação numérica (hashing) das palavras de cada documento.

In [52]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer 

tokenizer= Tokenizer(inputCol="Document", outputCol="words") # Transforma string em lista de palavras
wordsData = tokenizer.transform(cleanedArticles)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures") # Transforma lista de palavras em hash ([palavras],[quantidade])
featurizedData = hashingTF.transform(wordsData)
featurizedData.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+--------------------+--------------------+
| ID|               Title|               Time|            Document|               words|         rawFeatures|
+---+--------------------+-------------------+--------------------+--------------------+--------------------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|[anarchism, (some...|(262144,[116,120,...|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|[autism, is, a, b...|(262144,[521,1546...|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|[the, albedo, of,...|(262144,[1625,179...|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|[the, letter, a, ...|(262144,[5303,603...|
|303|             Alabama|2008-12-29 08:15:47|Alabama (formally...|[alabama, (formal...|(262144,[93,115,3...|
|305|            Achilles|2008-12-30 06:18:01|thumb\n\nIn Greek...|[thumb\n\nin, gre...|(262144,[305,991,...|
|307|     

Agora temos TF (Term Frequency) (Frequência de cada palavra em cada documento) e temos que achar TF-IDF (Inverse Document Frequency). Esse valor será calculado para cada termo em cada documento, e será guardado em "features", um vetor para cada atributo.

In [54]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData) # Conta em quantos documentos cada termo ocorre e calcula os pesos de cada posição do vetor
rescaledData = idfModel.transform(featurizedData) # Aplica modelo treinado sobre cada doc e multiplica TF por IDF

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
rescaledData.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
| ID|               Title|               Time|            Document|               words|         rawFeatures|            features|
+---+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|[anarchism, (some...|(262144,[116,120,...|(262144,[116,120,...|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|[autism, is, a, b...|(262144,[521,1546...|(262144,[521,1546...|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|[the, albedo, of,...|(262144,[1625,179...|(262144,[1625,179...|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|[the, letter, a, ...|(262144,[5303,603...|(262144,[5303,603...|
|303|             Alabama|2008-12-29 08:15:47|Alabama (formally...|[alabama, (forma

Vamos fazer um exemplo. Queremos procurar e saber a relevância do termo "international". Para isso, temos que aplicar o mesmo hash anterior à palavra.

In [59]:
from pyspark.sql.types import *

schema = StructType([StructField("words", ArrayType(StringType()))]) # Define schema do dataframe que conterá lista de palavras

df = spark.createDataFrame(([[["international"]]]), schema).toDF("words") # Cria esse dataframe de teste com apenas "international"
df.show()

international = hashingTF.transform(df) # Aplica hash
international.show()

featureVec = international.select('rawFeatures').collect() # Transforma coluna rawFeatures para lista 
print(featureVec)

internationalID = int(featureVec[0].rawFeatures.indices[0]) # Pega o id de "international"
print(internationalID)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+
|          words|
+---------------+
|[international]|
+---------------+

+---------------+--------------------+
|          words|         rawFeatures|
+---------------+--------------------+
|[international]|(262144,[26092],[...|
+---------------+--------------------+

[Row(rawFeatures=SparseVector(262144, {26092: 1.0}))]
26092

A partir do ID, podemos calcular o valor TF-IDF de "international" para cada documento.

In [62]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

termExtractor = udf(lambda x: float(x[internationalID]), FloatType()) #
gettysburgDF = rescaledData.withColumn('score', termExtractor(rescaledData.features)) #

gettysburgDF.show()
                                                        

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+
| ID|               Title|               Time|            Document|               words|         rawFeatures|            features|    score|
+---+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|[anarchism, (some...|(262144,[116,120,...|(262144,[116,120,...|30.247606|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|[autism, is, a, b...|(262144,[521,1546...|(262144,[521,1546...|      0.0|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|[the, albedo, of,...|(262144,[1625,179...|(262144,[1625,179...|      0.0|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|[the, letter, a, ...|(262144,[5303,603...|(262144,[5303,603...|1.8904754|
|303|        

Rankeando por score, teremos os artigos em que "international" tem mais valor.

In [63]:
sortedResults = gettysburgDF.filter("score > 0").orderBy('score', ascending=False).select('ID', 'Title', 'Document', 'score')
sortedResults.show(truncate=100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------------------------------+----------------------------------------------------------------------------------------------------+---------+
|  ID|                                Title|                                                                                            Document|    score|
+----+-------------------------------------+----------------------------------------------------------------------------------------------------+---------+
|2386|                    American Airlines|American Airlines, Inc. (AA) is a US-based airline, the world's largest in passenger miles transp...|30.247606|
|  12|                            Anarchism|Anarchism (sometimes referred to as libertarianism, though that term sometimes has other meanings...|30.247606|
|1942|                              Airline|An airline provides air transport services for passenger or freight, generally with a recognized ...|26.466656|
|1216|                               Athens|Athens (; , Athina, 