## TAREA MapReduce - MINERÍA DE DATOS
**Estudiante:** Nadiabeth Diana Mallqui Apaza    
**Código:** 171063

In [23]:
# Importamos la librería "findspark"
import findspark
# "findspark.init()" permite que pyspark se pueda importar como una biblioteca regular.
findspark.init()

In [24]:
# Importamos la librerías a utilizar
import math
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark_session = SparkSession(sc)

#### **Lectura de datos**

In [25]:
# Leemos nuestro archivo de texto
data = sc.textFile("prueba.txt")
# Lo almacenamos en lines para que pueda ser iterable
lines = data.collect()
# Creamos una lista donde almacenaremos cada linea u oración del archivo
sentences =[]
for line in lines:
    if (line != ""): # Sin las lineas vacías
        sentences.append(line)

In [26]:
# Creamos otra lista para almacenar los documentos
documents = []
for i in range(0,len(sentences)):
    documents.append((i,sentences[i]))
# Crearemos un DataFrame para mostrar los documentos con sus respectivos labels
documentsData = spark_session.createDataFrame(documents, ["label", "sentence"]) 
# Mostramos los documentos en DataFrame
documentsData.show(truncate=False)

+-----+-----------------------------------------------------------------------------------+
|label|sentence                                                                           |
+-----+-----------------------------------------------------------------------------------+
|0    |protester stage diein protest store nyc anger misplace retweet agree               |
|1    |apple intraday comment update range premium user aapl stockaction trading stock mkt|
|2    |ios app 2014 name elevate brain training iphone app                                |
|3    |shit                                                                               |
|4    |founder attack boss ridiculous comment ad                                          |
|5    |evidence factcheckthis                                                             |
|6    |educate                                                                            |
|7    |hard reach buy suicide squad comic_strip apple store turkey              

### **BoW**

In [31]:
# Paraleliazmos nuestro data
lines=sc.parallelize(documents)
# Recupera todos los elementos del lines
lines.collect()
map1=lines.flatMap(lambda x: [((x[0],i),1) for i in x[1].split()])
# Recupera todos los elementos del map1
map1.collect()
#Map Reduce
reduce=map1.reduceByKey(lambda x,y:x+y)
#BOG de las 4 oraciones
reduce.reduceByKey(lambda x,y:x+y).take(50)

[((0, 'protest'), 1),
 ((0, 'anger'), 1),
 ((0, 'misplace'), 1),
 ((0, 'retweet'), 1),
 ((1, 'range'), 1),
 ((1, 'user'), 1),
 ((1, 'trading'), 1),
 ((1, 'stock'), 1),
 ((1, 'mkt'), 1),
 ((2, '2014'), 1),
 ((2, 'name'), 1),
 ((6, 'educate'), 1),
 ((7, 'buy'), 1),
 ((7, 'apple'), 1),
 ((8, 'delete'), 1),
 ((8, 'music'), 1),
 ((9, 'range'), 1),
 ((9, 'user'), 1),
 ((9, 'trading'), 1),
 ((9, 'stock'), 1),
 ((9, 'mkt'), 1),
 ((10, '45,000'), 1),
 ((11, 'apple'), 1),
 ((12, 'love'), 1),
 ((13, 'geek'), 1),
 ((14, 'onlinefootprint'), 1),
 ((15, 'agree'), 1),
 ((16, 'fuck'), 1),
 ((16, 'make'), 1),
 ((16, 'shit'), 1),
 ((18, 'asus'), 1),
 ((18, 'freezing'), 1),
 ((18, 'laggy'), 1),
 ((18, 'excellence'), 1),
 ((19, 'amp'), 1),
 ((19, 'iphone'), 1),
 ((19, 'want'), 1),
 ((19, 'eat'), 1),
 ((19, 'place'), 1),
 ((20, 'class'), 1),
 ((21, 'ability'), 1),
 ((22, 'aapl'), 1),
 ((22, '2014'), 1),
 ((23, 'robertson'), 1),
 ((23, 'bullish'), 1),
 ((23, 'apple'), 1),
 ((24, 'update'), 1),
 ((25, 'hold')

### Algoritmo TF - IDF

In [30]:
# Almacenamos en la variable data los documentos a tratar
data= documents
lines=sc.parallelize(data)
lines.collect()

# Asignamos nuestro valor existente a un nuevo par clave-valor que comprende:
    # - ID de documento
    # - token como clave
    # - 1 (que representa el recuento) como valor
    
# Usaremos "flatMap" para combinar todos los tokens en una sola lista
map_1 = lines.flatMap(lambda x: [((x[0],i),1) for i in x[1].split()])
map_1.collect()

# Agrupamos los pares clave con la clave común y agregaremos los valores 
# de la misma clave para obtener la frecuencia del término (TF)
reduce = map_1.reduceByKey(lambda x,y:x+y)
reduce.collect()

# Realizamos un nuevo mapeo
TF = reduce.map(lambda x: (x[0][1],(x[0][0],x[1])))
TF.collect()

# Calculamos la frecuencia inversa de documentos (IDF)

# Primero, asignamos el valor anterior a un nuevo par clave 
    # Donde: 
    # la clave será el token 
    # su valor será el identificador TF del documento
map_3 = reduce.map(lambda x: (x[0][1],(x[0][0],x[1],1)))
map_3.collect()

# Luego, extraemos el token y el número de contador
map_4 = map_3.map(lambda x:(x[0],x[1][2]))
map_4.collect()

# Por último, reducimos por clave para obtener el recuento de documentos 
# que contienen un token i particular 
reduce_2 = map_4.reduceByKey(lambda x,y:x+y)
reduce_2.collect()

# Así obtenemos el (IDF)
IDF = reduce_2.map(lambda x: (x[0],math.log10(len(data)/x[1])))
IDF.collect()

# Calculamos el (TF-IDF)

# Ya que tenemos dos RDD's realizaremos una combinación interna para asignar 
# a cada token una identificación de documento, TF y puntuación IDF.
RDD = TF.join(IDF)
RDD.collect()

# Multiplicamos el TF e IDF de cada token asociado con la identificación del documento respectivo
RDD = RDD.map(lambda x: (x[1][0][0],(x[0],x[1][0][1],x[1][1],x[1][0][1]*x[1][1]))).sortByKey()
RDD.collect()

# Por último, convertimos la salida final a un marco de datos de Pyspark para visualizar las 
# puntuaciones con mayor claridad
RDD = RDD.map(lambda x: (x[0],x[1][0],x[1][1],x[1][2],x[1][3]))
hasattr(RDD, "toDF")
RDD.toDF(["DocumentId","Token","TF","IDF","TF-IDF"]).show(50)

+----------+-------------+---+------------------+------------------+
|DocumentId|        Token| TF|               IDF|            TF-IDF|
+----------+-------------+---+------------------+------------------+
|         0|        store|  1|0.9777236052888477|0.9777236052888477|
|         0|        diein|  1|1.1026623418971477|1.1026623418971477|
|         0|      retweet|  1|1.1026623418971477|1.1026623418971477|
|         0|        agree|  1|1.1026623418971477|1.1026623418971477|
|         0|        stage|  1|1.1026623418971477|1.1026623418971477|
|         0|          nyc|  1|1.1026623418971477|1.1026623418971477|
|         0|    protester|  1|1.1026623418971477|1.1026623418971477|
|         0|      protest|  1|1.1026623418971477|1.1026623418971477|
|         0|        anger|  1|1.1026623418971477|1.1026623418971477|
|         0|     misplace|  1|1.1026623418971477|1.1026623418971477|
|         1|        range|  1|1.1026623418971477|1.1026623418971477|
|         1|      trading|  1|1.10