#Laboratorio 1
```
Curso     : Mineria de Datos 2021-2
Docente   : CARLOS FERNANDO MONTOYA CUBAS
Autor     : Carlos Enrique Quispe Chambilla
Lugar     : Cusco, Perú, 2022.
```

### PySpark

#### **Este notebook contiene:**
#### *Parte 1:* Creación de una base RDD y RDD de tupla
#### *Parte 2:* Manejo de RDD de tuplas
#### *Parte 3:* Encontrar palabras sueltas y calcular promedios
#### *Parte 4:* Aplicar el recuento de palabras a un archivo
#### *Parte 5:* Similitud entre objetos

## Parte 1: Creación y manipulación de RDD

#### Creando una base RDD

In [None]:
! pip install pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.appName('TF IDF')\
    .config('spark.master', 'local[4]')\
    .config('spark.shuffle.sql.partitions', 1)\
    .sc = SparkContext.getOrCreate()



In [4]:
import pyspark # only run after findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [8]:
ListaPalavras = ['gato', 'elefante', 'rato', 'rato', 'gato']
palavrasRDD = sc.parallelize(ListaPalavras, 4)
print(type(palavrasRDD))

<class 'pyspark.rdd.RDD'>


#### Plural


In [10]:
# EXERCICIO
def Plural(palavra):
    """Adds an 's' to `palavra`.

    Args:
        palavra (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return f"{palavra}s"

print(Plural('gato'))

gatos


In [11]:
help(Plural)

Help on function Plural in module __main__:

Plural(palavra)
    Adds an 's' to `palavra`.
    
    Args:
        palavra (str): A string.
    
    Returns:
        str: A string with 's' added to it.



In [12]:
assert Plural('rato')=='ratos', 'resultado incorreto!'
print ('OK')

OK


#### Aplicar la función a RDD


In [13]:
# Ejemplo
pluralRDD = palavrasRDD.map(Plural)
print (pluralRDD.collect())

['gatos', 'elefantes', 'ratos', 'ratos', 'gatos']


In [14]:
assert pluralRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print ('OK')

OK


#### **Nota:** use el comando `collect()` solo cuando esté seguro de que la lista cabe en la memoria. Para guardar los resultados en un archivo de texto o base de datos, usaremos otro comando.

In [15]:
# Ejemplo
pluralLambdaRDD = palavrasRDD.map(lambda x: f"{x}s")
print (pluralLambdaRDD.collect())

['gatos', 'elefantes', 'ratos', 'ratos', 'gatos']


In [16]:
assert pluralLambdaRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print ('OK')

OK


In [17]:
# Ejemplo
pluralTamanho = (pluralRDD
                 .map(lambda x:len(x))
                 .collect()
                 )
print (pluralTamanho)

[5, 9, 5, 5, 5]


In [18]:
assert pluralTamanho==[5,9,5,5,5], 'valores incorretos'
print ("OK")

OK


In [19]:
# Ejemplo
palavraPar = palavrasRDD.map(lambda x: (x, 1))
print (palavraPar.collect())

[('gato', 1), ('elefante', 1), ('rato', 1), ('rato', 1), ('gato', 1)]


In [20]:
assert palavraPar.collect() == [('gato',1),('elefante',1),('rato',1),('rato',1),('gato',1)], 'valores incorretos!'
print ("OK")

OK


### **Parte 2: Manipulación de Tuplas RDD**

#### Manipulemos nuestro RDD para contar las palabras en el texto.

In [21]:
# EXERCICIO
palavrasGrupo = palavraPar.groupByKey()
for chave, valor in palavrasGrupo.collect():
    valores = list(valor)
    print(f'{chave}: {valores}')

elefante: [1]
rato: [1, 1]
gato: [1, 1]


In [22]:
assert sorted(palavrasGrupo.mapValues(lambda x: list(x)).collect()) == [('elefante', [1]), ('gato',[1, 1]), ('rato',[1, 1])], 'Valores incorretos!'
print ("OK")

OK


#### Cálculo de los recuentos


In [23]:
# EXERCICIO
contagemGroup = palavrasGrupo.map(lambda x:(x[0],sum(x[1])))
print (contagemGroup.collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [24]:
assert sorted(contagemGroup.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


#### reduceByKey


In [25]:
# EXERCICIO
contagem = palavraPar.reduceByKey(lambda x,y:x+y)
print( contagem.collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [26]:
assert sorted(contagem.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


#### Comandos de agrupación

#### La forma más común de realizar esta tarea, a partir de nuestras RDD RDDwords, es encadenar el mapa y los comandos reduceByKey en una línea de comando.

In [28]:
# EXERCICIO
contagemFinal = (palavrasRDD
                 .map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
                 .collect())                
                 
print (contagemFinal)

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [29]:
assert sorted(contagemFinal)==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


### **Parte 3: Encontrar las palabras únicas y calcular el recuento promedio**

#### Palabras únicas

In [30]:
# EXERCICIO
palavrasUnicas = palavrasRDD.distinct().count()
print (palavrasUnicas)

3


In [31]:
assert palavrasUnicas==3, 'valor incorreto!'
print ("OK")

OK


#### Cálculo del recuento promedio de palabras

In [None]:
# EXERCICIO
# add é equivalente a lambda x,y: x+y
from operator import add
total = (contagem
         .map(lambda x: x[1])
         .reduce(add))
media = total / float(palavrasUnicas)
print (total)
print (round(media, 2))

5
1.67


In [None]:
assert round(media, 2)==1.67, 'valores incorretos!'
print ("OK")

OK


### **Parte 4: Aplicar nuestro algoritmo a un archivo**

#### Función contaPalavras



In [32]:
# EXERCICIO
def contaPalavras(chavesRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        chavesRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return (chavesRDD
            .map(lambda x: (x, 1))
            .reduceByKey(lambda x, y: x + y))

print (contaPalavras(palavrasRDD).collect())

[('elefante', 1), ('rato', 2), ('gato', 2)]


In [33]:
assert sorted(contaPalavras(palavrasRDD).collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print ("OK")

OK


#### Normalizando el texto 


In [None]:
# EXERCICIO
import re
def removerPontuacao(texto):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        texto (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return re.sub(r'[^A-Za-z0-9 ]', '', texto).strip().lower()
print (removerPontuacao('Ola, quem esta ai??!'))
print (removerPontuacao(' Sem espaco e_sublinhado!'))

In [None]:
assert removerPontuacao(' O uso de virgulas, embora permitido, nao deve contar. ')=='o uso de virgulas embora permitido nao deve contar', 'string incorreta!'
print ("OK")

#### Ubicamos el dataset pg100.txt para importarlo

In [36]:
# Encontrar la Ruta e importar el data.txt
RutaArchivo = "D:/Cursos Hechos/40.- Mineria de Datos/Datasets/pg100.txt"


In [None]:
# Apenas execute a célula
import os.path

arquivo = os.path.join(RutaArchivo) 

# lê o arquivo com textFile e aplica a função removerPontuacao        
shakesRDD = (sc
             .textFile(arquivo, 8)
             .map(removerPontuacao)
             )

# zipWithIndex gera tuplas (conteudo, indice) onde indice é a posição do conteudo na lista sequencial
# Ex.: sc.parallelize(['gato','cachorro','boi']).zipWithIndex() ==> [('gato',0), ('cachorro',1), ('boi',2)]
# sep.join() junta as strings de uma lista através do separador sep. Ex.: ','.join(['a','b','c']) ==> 'a,b,c'
print ('\n'.join(shakesRDD
                .zipWithIndex()
                .map(lambda linha: '{0}: {1}'.format(linha[0],linha[1]))
                .take(15)
               ))

#### Extrayendo las palabras


In [None]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.flatMap(lambda x: x.split(" "))
total = shakesPalavrasRDD.count()
print (shakesPalavrasRDD.take(5))
print (total)

#### Como habrás notado, el uso de la función map() genera una lista para cada línea, creando un RDD que contiene una lista de listas.


In [None]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.flatMap(lambda x: x.split())
total = shakesPalavrasRDD.count()
print (shakesPalavrasRDD.take(5))
print (total)

['project', 'gutenbergs', 'the', 'complete', 'works']
959359


In [None]:
assert total==959359, "valor incorreto de palavras!"
print ("OK")
assert shakesPalavrasRDD.take(5)==['project', 'gutenbergs', 'the', 'complete', 'works'],'lista incorreta de palavras'
print ("OK")

#### Número de palabras

In [None]:
# EXERCICIO
top15 = contaPalavras(shakesPalavrasRDD).sortBy(lambda x: x[1], ascending=False).take(15)
print ('\n'.join(map(lambda x: f'{x[0]}: {x[1]}', top15)))

In [None]:
assert top15 == [('the', 29996), ('and', 28353), ('i', 21860), ('to', 20885), ('of', 18811), ('a', 15992), ('you', 14439), ('my', 13191), ('in', 12027), ('that', 11782), ('is', 9711), ('not', 9068), ('with', 8521), ('me', 8271), ('for', 8184)],'valores incorretos!'
print ("OK")

### **Parte 5: Similaridad entre Objetos**

### En esta parte del laboratorio aprenderemos a calcular la distancia entre atributos numéricos, categóricos y textuales.

####  Vectores en el espacio euclidiano 

In [None]:
import numpy as np

# Vamos criar uma função pNorm que recebe como parâmetro p e retorna uma função que calcula a pNorma
def pNorm(p):
    """Generates a function to calculate the p-Norm between two points.

    Args:
        p (int): The integer p.

    Returns:
        Dist: A function that calculates the p-Norm.
    """

    def Dist(x,y):
        return np.power(np.power(np.abs(x-y),p).sum(),1/float(p))
    return Dist

In [None]:
# Vamos criar uma RDD com valores numéricos
np.random.seed(42)
numPointsRDD = sc.parallelize(enumerate(np.random.random(size=(10,100))))

In [None]:
# EXERCICIO
# Procure dentre os comandos do PySpark, um que consiga fazer o produto cartesiano da base com ela mesma
cartPointsRDD = numPointsRDD.cartesian(numPointsRDD)

# Aplique um mapa para transformar nossa RDD em uma RDD de tuplas ((id1,id2), (vetor1,vetor2))
# DICA: primeiro utilize o comando take(1) e imprima o resultado para verificar o formato atual da RDD
cartPointsParesRDD = cartPointsRDD.map(lambda x: ((x[0][0],x[1][0]), (x[0][1],x[1][1])))


# Aplique um mapa para calcular a Distância Euclidiana entre os pares
Euclid = pNorm(2)
distRDD = cartPointsParesRDD.map(lambda x: (x[0], Euclid(x[1][0],x[1][1])))

# Encontre a distância máxima, mínima e média, aplicando um mapa que transforma (chave,valor) --> valor
# e utilizando os comandos internos do pyspark para o cálculo da min, max, mean
statRDD = distRDD.map(lambda x: x[1])

minv, maxv, meanv = statRDD.min(), statRDD.max(), statRDD.mean()
print (minv, maxv, meanv)

0.0 4.709048183663605 3.75119168897537


In [None]:
assert (minv.round(2), maxv.round(2), meanv.round(2))==(0.0, 4.71, 3.75), 'Valores incorretos'
print ("OK")

####  Valores Categóricos 


In [None]:
# Vamos criar uma função para calcular a distância de Hamming
def Hamming(x,y):
    """Calculates the Hamming distance between two binary vectors.

    Args:
        x, y (np.array): Array of binary integers x and y.

    Returns:
        H (int): The Hamming distance between x and y.
    """
    return (x!=y).sum()

# Vamos criar uma função para calcular a distância de Jaccard
def Jaccard(x,y):
    """Calculates the Jaccard distance between two binary vectors.

    Args:
        x, y (np.array): Array of binary integers x and y.

    Returns:
        J (int): The Jaccard distance between x and y.
    """
    return (x==y).sum()/float( np.maximum(x,y).sum() )

In [None]:
# Vamos criar uma RDD com valores categóricos
catPointsRDD = sc.parallelize(enumerate([['alto', 'caro', 'azul'],
                             ['medio', 'caro', 'verde'],
                             ['alto', 'barato', 'azul'],
                             ['medio', 'caro', 'vermelho'],
                             ['baixo', 'barato', 'verde'],
                            ]))

In [None]:
# EXERCICIO
# Crie um RDD de chaves únicas utilizando flatMap
chavesRDD = (catPointsRDD
             .flatMap(lambda x: [((x[0],xi),1) for xi in x[1]])
             .reduceByKey(lambda x,y: x)
             .map(lambda x: x[0])
             )

chaves = dict((v,k) for k,v in chavesRDD.collect())
nchaves = len(chaves)
print (chaves, nchaves)

{'caro': 0, 'azul': 0, 'baixo': 4, 'alto': 2, 'verde': 1, 'medio': 3, 'barato': 2, 'vermelho': 3} 8


In [None]:
assert chaves=={'alto': 2, 'caro': 0, 'baixo': 4, 'verde': 1, 'azul': 2, 'medio': 3, 'barato': 4, 'vermelho': 3}
#,'valores incorretos!'
print ("OK")

assert nchaves==8 
#,'número de chaves incorreta'
print ("OK")

In [None]:
def CreateNP(atributos,chaves):  
    """Binarize the categorical vector using a dictionary of keys.

    Args:
        atributos (list): List of attributes of a given object.
        chaves (dict): dictionary with the relation attribute -> index

    Returns:
        array (np.array): Binary array of attributes.
    """
    
    array = np.zeros(len(chaves))
    for atr in atributos:
        array[ chaves[atr] ] = 1
    return array

# Converte o RDD para o formato binário, utilizando o dict chaves
binRDD = catPointsRDD.map(lambda rec: (rec[0],CreateNP(rec[1], chaves)))
binRDD.collect()

In [None]:
# EXERCICIO
#Libreria Adicional
import statistics as stats

# Procure dentre os comandos do PySpark, um que consiga fazer o produto cartesiano da base com ela mesma
cartBinRDD = binRDD.cartesian(binRDD)

# Aplique um mapa para transformar nossa RDD em uma RDD de tuplas ((id1,id2), (vetor1,vetor2))
# DICA: primeiro utilize o comando take(1) e imprima o resultado para verificar o formato atual da RDD
cartBinParesRDD = cartBinRDD.map(lambda x: ((x[0][0],x[1][0]), (x[0][1],x[1][1])))

# Aplique um mapa para calcular a Distância de Hamming e Jaccard entre os pares
hamRDD = cartBinParesRDD.map(lambda x: (x[0], Hamming(x[1][0],x[1][1])))
jacRDD = cartBinParesRDD.map(lambda x: (x[0], Jaccard(x[1][0],x[1][1])))

# Encontre a distância máxima, mínima e média, aplicando um mapa que transforma (chave,valor) --> valor
# e utilizando os comandos internos do pyspark para o cálculo da min, max, mean
statHRDD = hamRDD.map(lambda x: x[1])
statJRDD = jacRDD.map(lambda x: x[1])

Hmin = min(statHRDD)
Hmax = max(statHRDD)
Hmean= mean(statHRDD)

Jmin = min(statJRDD)
Jmax =  max(statJRDD)
Jmean = mean(statJRDD) 

print ("\t\tMin\tMax\tMean")
print ("Hamming:\t{:.2f}\t{:.2f}\t{:.2f}".format(Hmin, Hmax, Hmean ))
print ("Jaccard:\t{:.2f}\t{:.2f}\t{:.2f}".format( Jmin, Jmax, Jmean ))

In [None]:
assert (Hmin.round(2), Hmax.round(2), Hmean.round(2)) == (0.00,5.00,2.40), 'valores incorretos'
print ("OK")
assert (Jmin.round(2), Jmax.round(2), Jmean.round(2)) == (0.60,4.00,1.90), 'valores incorretos'
print ("OK")