24) Dado un tamaño de vocabulario parametrizable y una lista de stopwords también parametrizable implemente tf-IDF para los textos de los contenidos de forma distribuida. Debe obtener un vector por cada texto (⭐⭐⭐).

**Analisis previo**

1) La respuesta al ejercicio sera un RDD cuyos valores definen unívocamente la matriz de TF-IDF (y por lo tanto definen el vector para cada texto).

2) Una palabra debe tener 2 caracteres como minimo para ser considerada.

3)Tenemos valores None en la columna "text" la cual nos interesa, entonces antes de trabajar eliminamos estos datos.

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt update
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext

from google.colab import drive
drive.mount("/content/drive")

import math
import re
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')

In [None]:
featuresParametrizable = 5

stopwordsParametrizable = set(stopwords.words('spanish')).union(stopwords.words('english'))

### **Resolucion explicativa**

In [None]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

sqlContext = SQLContext(sc)
df = sqlContext.read.csv('/content/drive/MyDrive/Organizacion de Datos/Colab Notebooks/TP1/contents_text_sample.csv', header=True, inferSchema=True, escape='"', multiLine=True)
rdd = df.rdd



Pasos

1) Con la transformacion map() buscamos obtener registros de la forma (id de texto, lista de palabras del texto) eliminando los caracteres que no nos interesan.

2) Con la transformacion map() buscamos obtener registros de la forma (id de texto, lista de palabras del texto) donde además eliminamos aquellas palabras que no cumplen con tener 2 caracteres como minimo y aquellas palabras que son stopwords.

In [None]:
rdd_map = rdd.filter(lambda x: x.text != None).map(lambda x: (x.id, re.findall(r"\w+(?:'\w+)?|[^\w\s]", x.text.lower())))\
             .map(lambda x: (x[0], [palabra for palabra in x[1] if ((len(palabra) > 1) and (palabra not in stopwordsParametrizable))])).cache()

cantidad_documentos = rdd_map.count()

Pasos: Hallamos las K palabras mas comunes. Notemos que K es el tamaño de vocabulario parametrizable que se menciona en el enunciado.

4) Con la transformacion flatMap() buscamos unir todas las palabras en un unico registro.

5) Con la transformacion map() buscamos obtener registros de la forma (palabra, 1) para poder contar la frecuencia de dichas palabras.

6) Con la transformacion reduceByKey() buscamos sumar los registros para una misma clave.

7) Con la accion takeOrdered() buscamos hallar las primeras K palabras de mayor a menor. Es decir, las K mas frecuentes.

In [None]:
palabras_comunes = rdd_map.flatMap(lambda x: x[1]).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).takeOrdered(featuresParametrizable, lambda x: -x[1])
palabras_comunes = [x[0] for x in palabras_comunes]

Pasos: Contamos la frecuencia de las K palabras mas comunes en cada documento.

8) Con la transformacion flatMap() buscamos obtener registros de la forma ((id de texto, palabra), 1) si la palabra es una de las mas comunes. Utilizamos un ciclo for ya que buscamos recorrer un registro para separarlo en varios.

9) Con la transformacion reduceByKey() buscamos sumar los registros para una misma clave. Es decir, buscamos obtener la frecuencia de cada palabra segun el texto. Notemos que en este RDD nos faltan aquellas palabras comunes que tienen frecuencia 0.

In [None]:
rdd_flatmap = rdd_map.flatMap(lambda x: [((x[0], palabra), 1) for palabra in x[1] if palabra in palabras_comunes]).cache()
rdd_frecuencias_por_texto = rdd_flatmap.reduceByKey(lambda x,y: x+y)

Pasos: Construimos un vector para cada documento de la forma [(Texto, Palabra), Frecuencia].

10) Con la transformacion flatMap() buscamos obtener registros de la forma ((id de texto, palabra), 0) para cada una de las palabras mas comunes. Notemos que  en este RDD nos faltan las frecuencias de cada palabra segun el texto.

11) Con la transformacion rightOuterJoin() buscamos obtener un RDD de la forma ((Texto, Palabra), (Frecuencia, 0)) donde si tenemos un valor distinto de None en Frecuencia entonces quiere decir que dicha palabra esta al menos 1 vez en el texto y nos quedamos con este valor, en caso contrario la palabra no aparece en el texto y nos quedamos con frecuencia igual a 0.

In [None]:
rdd_vectores = rdd_map.flatMap(lambda x: [((x[0], palabra), 0) for palabra in palabras_comunes])
rdd_tf = rdd_frecuencias_por_texto.rightOuterJoin(rdd_vectores).map(lambda x: ((x[0][0], x[0][1]), x[1][0] if x[1][0] != None else x[1][1]))

Pasos: Contamos el numero de documentos que contienen a cada una de las K palabras mas comunes.

12) Con la transformacion distinct() eliminamos los registros duplicados ya que en este caso no nos interesa saber la frecuencia en cada texto, sino que nos interesa el numero de textos que contienen a cada una de las palabras mas comunes.

13) Con la transformacion map() buscamos obtener registros de la forma (palabra, 1) para poder contarlas.

14) Con la accion countByKey() buscamos contar las ocurrencias de registros para cada clave.

In [None]:
frecuencias_por_palabra = rdd_flatmap.distinct().map(lambda x: (x[0][1], x[1])).countByKey()

Pasos: Construimos un vector para cada documento de la forma [(Texto, Palabra), Frecuencia * IDF(Palabra)]

15) Con la transformacion map() buscamos obtener registros de la forma ((Texto, Palabra), Frecuencia * IDF(Palabra)) para asi obtener un RDD cuyos valores definen unívocamente la matriz de TF-IDF.

In [None]:
rdd_tfidf = rdd_tf.map(lambda x: ((x[0][0], x[0][1]), x[1] * math.log((cantidad_documentos+1)/frecuencias_por_palabra[x[0][1]], 10)))

### **Resolucion reducida**

In [None]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

sqlContext = SQLContext(sc)
df = sqlContext.read.csv('/content/drive/MyDrive/Organizacion de Datos/Colab Notebooks/TP1/contents_text_sample.csv', header=True, inferSchema=True, escape='"', multiLine=True)
rdd = df.rdd

rdd_map = rdd.filter(lambda x: x.text != None).map(lambda x: (x.id, re.findall(r"\w+(?:'\w+)?|[^\w\s]", x.text.lower())))\
             .map(lambda x: (x[0], [palabra for palabra in x[1] if ((len(palabra) > 1) and (palabra not in stopwordsParametrizable))])).cache()

cantidad_documentos = rdd_map.count()

palabras_comunes = rdd_map.flatMap(lambda x: x[1]).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).takeOrdered(featuresParametrizable, lambda x: -x[1])
palabras_comunes = [x[0] for x in palabras_comunes]

rdd_flatmap = rdd_map.flatMap(lambda x: [((x[0], palabra), 1) for palabra in x[1] if palabra in palabras_comunes]).cache()
rdd_frecuencias_por_texto = rdd_flatmap.reduceByKey(lambda x,y: x+y)
rdd_vectores = rdd_map.flatMap(lambda x: [((x[0], palabra), 0) for palabra in palabras_comunes])
rdd_tf = rdd_frecuencias_por_texto.rightOuterJoin(rdd_vectores).map(lambda x: ((x[0][0], x[0][1]), x[1][0] if x[1][0] != None else x[1][1]))

frecuencias_por_palabra = rdd_flatmap.distinct().map(lambda x: (x[0][1], x[1])).countByKey()
frecuencias_por_palabra

rdd_tfidf = rdd_tf.map(lambda x: ((x[0][0], x[0][1]), x[1] * math.log((cantidad_documentos+1)/frecuencias_por_palabra[x[0][1]], 10)))

### **Respuesta**

In [None]:
rdd_tfidf.take(5)

[((127, 'ref'), 12.439900925118277),
 ((127, 'http'), 2.1151711142627394),
 ((127, 'categoría'), 0.5885068827959938),
 ((179, 'ref'), 1.036658410426523),
 ((179, 'http'), 1.5863783356970544)]