**UNIVERSIDAD NACIONAL DE SAN ANTONIO ABAD DEL CUSCO**
> **CURSO:** Minería de datos

> **ALUMNO:**Vega Centeno Olivera, Ronaldinho

## **1.Instalar bibliotecas**

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 55.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=b58573ca8998d49e97fda0fb06b27c85d9fc08ed77ca10ad394bc6490a28b4e5
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


## **2.Conexion a cluster Spark (SparkContext)**

In [None]:
from pyspark import SparkContext
sc =SparkContext() #Permite la creación y manipulación de RDDs

## **3.Bag of words**

In [119]:
def preprocesar(textRDD):
  """
    Modulo para eliminar lineas en blanco, o doc en blanco y 
    dar estructura a la data

  Parameters
  =========
    `textRDD :` RDD con los documentos
          ej -> [ 'founder attack boss ridiculous comment ad',
                  'evidence factcheckthis',
                  'educate']
  Returns
  =========
    `tupleData :` lista de tuplas (idDoc, Doc)
          ej ->[(5, 'founder attack boss ridiculous comment ad'),
                (6, 'evidence factcheckthis'),
                (7, 'educate')]

  """
  #Eliminar lineas vacias
  textRDD=textRDD.filter(lambda x : x is not '')
  #Crear tuplas para dar forma a la data
  tupleData = []
  i=1
  for line in textRDD.collect():
    tupla=(i,line)
    tupleData.append(tupla)
    i+=1
  return tupleData

In [123]:
def bow(data1,particiones=4):
  """
    a. tokenizamos 
    b. uniformizamos, convertimos a minusculas
    c. agregamos un valor a cada tupla : Ej. -> ((1,'hola',10),1)
    d. filtramos palabras de tamaño mayor a 2 letras

  Parameters
  =========
    `data1 :` lista de tuplas (id, Doc)
          ej -> [(5, 'founder attack boss ridiculous comment ad'),
                (6, 'evidence factcheckthis'),
                (7, 'educate')]
  Returns
  =========
    `parTokenRDD :` lista de tuplas de tuplas ((idDoc, token,len(Doc)), 1)
          ej ->[((1, 'protester',10), 1),
                ((1, 'stage',10), 1),
                ((1, 'diein',10), 1)]

  """
  #Creamos el RDD
  linesRDD=sc.parallelize(data1,particiones)
  parTokenRDD=linesRDD.flatMap(lambda x: [((x[0],i.lower(),len(x[1].split())),1) for i in x[1].split()])
  #filtramos palabras de tamaño mayor a 2 letras
  parTokenRDD2=parTokenRDD.filter(lambda x: len(x[0][1])>2)
  #obtenemos la frecuencia de cada palabra por documento
  frecPerDocumentRDD=parTokenRDD2.reduceByKey(lambda x,y:x+y)
  return frecPerDocumentRDD


In [124]:
#Crear el RDD leyendo el archivo .txt
textRDD=sc.textFile('twitts.txt',4)
datos=preprocesar(textRDD)
datos
BoW=bow(datos)
BoW.take(20)

[((1, 'protest', 10), 1),
 ((1, 'anger', 10), 1),
 ((1, 'misplace', 10), 1),
 ((1, 'retweet', 10), 1),
 ((2, 'range', 12), 1),
 ((2, 'user', 12), 1),
 ((2, 'trading', 12), 1),
 ((2, 'stock', 12), 1),
 ((2, 'mkt', 12), 1),
 ((3, 'ios', 9), 1),
 ((3, 'app', 9), 2),
 ((3, 'brain', 9), 1),
 ((6, 'factcheckthis', 2), 1),
 ((8, 'store', 9), 1),
 ((9, 'customer', 4), 1),
 ((10, 'range', 12), 1),
 ((10, 'user', 12), 1),
 ((10, 'trading', 12), 1),
 ((10, 'stock', 12), 1),
 ((10, 'mkt', 12), 1)]

## **3.TF**

In [125]:
def TF(datos):
  """
  Calcula la frecuencia de cada termino

  Parameters
  =========
    `datos :` lista de tuplas (id, Doc)
          ej -> [(5, 'founder attack boss ridiculous comment ad'),
                (6, 'evidence factcheckthis'),
                (7, 'educate')]

  Returns
  =========
    `tfRDD :`  lista de tuplas de tuplas ('token', ('id',frecuencia/len(doc)))
          ej -> [ ('protester', (1, 0.1)),
                  ('diein', (1, 0.1)),
                  ('agree', (1, 0.1)) ]

  """
  frecuenciasRDD=bow(datos)
  # Mapear el RDD para obetener ('token', ('id',frecuencia/ len(Doc) ))
  tfRDD=frecuenciasRDD.map(lambda x: (x[0][1],(x[0][0],x[1]/x[0][2])))
  return tfRDD

In [126]:
#Crear el RDD leyendo el archivo .txt
textRDD=sc.textFile('twitts.txt',4)
datos=preprocesar(textRDD)
tf=TF(datos)
tf.take(20)

[('protest', (1, 0.1)),
 ('anger', (1, 0.1)),
 ('misplace', (1, 0.1)),
 ('retweet', (1, 0.1)),
 ('range', (2, 0.08333333333333333)),
 ('user', (2, 0.08333333333333333)),
 ('trading', (2, 0.08333333333333333)),
 ('stock', (2, 0.08333333333333333)),
 ('mkt', (2, 0.08333333333333333)),
 ('ios', (3, 0.1111111111111111)),
 ('app', (3, 0.2222222222222222)),
 ('brain', (3, 0.1111111111111111)),
 ('factcheckthis', (6, 0.5)),
 ('store', (8, 0.1111111111111111)),
 ('customer', (9, 0.25)),
 ('range', (10, 0.08333333333333333)),
 ('user', (10, 0.08333333333333333)),
 ('trading', (10, 0.08333333333333333)),
 ('stock', (10, 0.08333333333333333)),
 ('mkt', (10, 0.08333333333333333))]

## **4.IDF**

In [127]:
import math
from pyspark.sql.functions import *

In [128]:
def IDF(datos):
  """
  Calcula Inverse document frequency
  
  Parameters
  =========
    `datos :` lista de tuplas (id, Doc)
          ej -> [(5, 'founder attack boss ridiculous comment ad'),
                (6, 'evidence factcheckthis'),
                (7, 'educate')]
    
  Returns
  =========
    `idfRDD :`  lista de tuplas ('token', idf(float))
          ej ->[('update', 1.3937460844372076),
                ('aapl', 0.5944950710074257),
                ('stockaction', 2.0249643045063834)]

  """
  cantidadDoc=len(datos)
  frecuenciasRDD=bow(datos)
  # Mapeamos para colocar cada token como valor clave. Ej. -> ('token',('id',frecuencia,1))
  tokenRDD=frecuenciasRDD.map(lambda x: (x[0][1],(x[0][0],x[1],1)))
  # Extraemos el token y su ocurrencia por documento
  tokenOcurrenciaDocRDD=tokenRDD.map(lambda x:(x[0],x[1][2]))
  # contamos la contidad de documentos que contienen un token
  docPerTokenRDD=tokenOcurrenciaDocRDD.reduceByKey(lambda x,y:x+y)
  #calculamos el logaritmo para el IDF
  idfRDD=docPerTokenRDD.map(lambda x: (x[0],math.log10(cantidadDoc/x[1])))
  return idfRDD

In [129]:
#Crear el RDD leyendo el archivo .txt
textRDD=sc.textFile('twitts.txt',4)
datos=preprocesar(textRDD)
idf=IDF(datos)
idf.take(20)

[('store', 1.105595616949241),
 ('customer', 1.968482948553935),
 ('onlinefootprint', 3.5812668052736707),
 ('turn', 2.1662934573028525),
 ('asus', 3.5812668052736707),
 ('freezing', 2.803115554890027),
 ('laggy', 3.5812668052736707),
 ('excellence', 3.5812668052736707),
 ('aapl', 0.5944950710074257),
 ('stockaction', 2.0249643045063834),
 ('stage', 1.3798696809532192),
 ('nyc', 1.353380100659997),
 ('smart', 2.5020855592260456),
 ('team', 2.4351387695954325),
 ('macmail', 3.5812668052736707),
 ('give', 2.1499030411146833),
 ('genius', 2.3259943001703647),
 ('say', 1.947798349694084),
 ('wonder', 2.882296800937652),
 ('think', 1.8104147936315265)]

## **5. TF-IDF**

In [130]:
def TF_IDF(datos):
  """
  Calcula Inverse document frequency
  
  Parameters
  =========
    `datos :` lista de tuplas (id, Doc)
          ej -> [(5, 'founder attack boss ridiculous comment ad'),
                (6, 'evidence factcheckthis'),
                (7, 'educate')]

  Returns
  =========
    `tfIdfRDD :`  lista de tuplas (idDoc,token,tf, idf,tf-idf)
          ej ->[(1, 'store', 1, 1.105595616949241, 1.105595616949241),
                (1, 'stage', 1, 1.3798696809532192, 1.3798696809532192),
                (1, 'nyc', 1, 1.353380100659997, 1.353380100659997)]

  """
  #obtenemos el RDD con el id del documento y la frecuencia para cada token
  tfRDD=TF(datos)
  # obtenemos el RDD con las puntuaciones IDF
  idfRDD=IDF(datos)
  #combinamos para asignar a cada token un id de documento, frecuencia y puntuación IDF
  tfIdfRDD=tfRDD.join(idfRDD)
  #multiplicamos los valores de TF e IDF de cada token
  tfIdfRDD=tfIdfRDD.map(lambda x: (x[1][0][0],(x[0],x[1][0][1],x[1][1],x[1][0][1]*x[1][1]))).sortByKey()
  #reorganizamos el RDD para tener mejor comprension
  tfIdfRDD=tfIdfRDD.map(lambda x: (x[0],x[1][0],x[1][1],x[1][2],x[1][3]))
  return tfIdfRDD

In [131]:
#Crear el RDD leyendo el archivo .txt
textRDD=sc.textFile('twitts.txt',4)
datos=preprocesar(textRDD)
tf_idf=TF_IDF(datos)
tf_idf.take(20)

[(1, 'store', 0.1, 1.105595616949241, 0.11055956169492409),
 (1, 'diein', 0.1, 1.399423217328898, 0.1399423217328898),
 (1, 'retweet', 0.1, 1.399423217328898, 0.1399423217328898),
 (1, 'agree', 0.1, 1.3690792008697128, 0.13690792008697128),
 (1, 'stage', 0.1, 1.3798696809532192, 0.13798696809532193),
 (1, 'nyc', 0.1, 1.353380100659997, 0.13533801006599971),
 (1, 'protester', 0.1, 1.388142206919209, 0.13881422069192093),
 (1, 'protest', 0.1, 1.377146822617746, 0.1377146822617746),
 (1, 'anger', 0.1, 1.388142206919209, 0.13881422069192093),
 (1, 'misplace', 0.1, 1.390935107103379, 0.1390935107103379),
 (2, 'range', 0.08333333333333333, 2.2195389692560776, 0.18496158077133978),
 (2, 'trading', 0.08333333333333333, 1.928054291498327, 0.1606711909581939),
 (2, 'stock', 0.08333333333333333, 1.457415164306585, 0.1214512636922154),
 (2, 'mkt', 0.08333333333333333, 1.968482948553935, 0.1640402457128279),
 (2, 'update', 0.08333333333333333, 1.3937460844372076, 0.11614550703643396),
 (2, 'apple',

## **Resultado**

In [132]:
import pandas as pd
#Creamos el daframe para mostrar los resultados
dfTabla=pd.DataFrame(tf_idf.collect(),columns=['Id Documento','Token','TF','IDF','TF-IDF'])
#mostarlos los 30 primeros resultados obtenidos
dfTabla[:30]

Unnamed: 0,Id Documento,Token,TF,IDF,TF-IDF
0,1,store,0.1,1.105596,0.11056
1,1,diein,0.1,1.399423,0.139942
2,1,retweet,0.1,1.399423,0.139942
3,1,agree,0.1,1.369079,0.136908
4,1,stage,0.1,1.37987,0.137987
5,1,nyc,0.1,1.35338,0.135338
6,1,protester,0.1,1.388142,0.138814
7,1,protest,0.1,1.377147,0.137715
8,1,anger,0.1,1.388142,0.138814
9,1,misplace,0.1,1.390935,0.139094
