#**2° PARCIAL - 5 ALGORITMOS DE PREPROCESAMIENTO**
---
```
Universidad Nacional de San Antonio Abad del Cusco
Asignatura: Mineria de Datos
Docente   : Carlos Fernando Montoya Cubas
Autor     : Etson Ronaldao Rojas Cahuana
Fecha     : 09/01/2022
Lugar     : Cusco, Perú
Proposito : Implementar 5 algoritmos de preprocesamiento en pySpark.
```
---


#Instalacion de pySpark en Google Colaboratory

In [None]:
!pip install pyspark==3.0.1 py4j==0.10.9

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 34 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612246 sha256=c62e6a2ef176ce8727c72175d5c4a276e0f5362360eed5d9b5f11896cd821360
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

#**1. Escalonamiento**

Los algoritmos basados en metodos de gradiente tienden a beneficiarse cuando los atributos estan entre $[0,1]$.

$X_{i,j} = \frac{X_{i,j} - max X_j} {max X_j - min_{X_j}}$



In [None]:
import IPython
from pyspark import SparkConf 
from pyspark.context import SparkContext 
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

In [None]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#Creamos datos
df = spark.createDataFrame([ (1, 'Aldo',12560,45,"True"),
                             (2, 'Benzema',42560,90,"False"),
                             (3, 'Cesar',31285,81,"True"),
                             (4, 'Doris',25285,50,"True"),
                             (5, 'Etson',32285,55,"False"),
                             (6, 'Fausto',18085,61,"True"),
                             (7, 'Gilberto',52185,70,"False"),
                             (8, 'Humberto',10345,46,"False")
                           ], ["id_usuario", "Nombre","Ingresos","Por_dia","Morocidad"])

#Mostrar datos iniciales
print("Data :")
df.show(8)

#UDF para convertir el tipo de columna de vector a tipo doble
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

#Iterando sobre columnas para el escalonamiento
for i in ["Ingresos","Por_dia"]:
    #Transformación con VectorAssembler (conversión de columna a tipo vector)
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    #Transformación MinMaxScaler
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Escalonado")

    #Canalización de VectorAssembler y MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    #Ajuste del pipeline en el marco de datos
    df = pipeline.fit(df).transform(df).withColumn(i+"_Escalonado", unlist(i+"_Escalonado")).drop(i+"_Vect")

print("Data + Escalonamiento :")
df.show(8)

Data :
+----------+--------+--------+-------+---------+
|id_usuario|  Nombre|Ingresos|Por_dia|Morocidad|
+----------+--------+--------+-------+---------+
|         1|    Aldo|   12560|     45|     True|
|         2| Benzema|   42560|     90|    False|
|         3|   Cesar|   31285|     81|     True|
|         4|   Doris|   25285|     50|     True|
|         5|   Etson|   32285|     55|    False|
|         6|  Fausto|   18085|     61|     True|
|         7|Gilberto|   52185|     70|    False|
|         8|Humberto|   10345|     46|    False|
+----------+--------+--------+-------+---------+

Data + Escalonamiento :
+----------+--------+--------+-------+---------+-------------------+------------------+
|id_usuario|  Nombre|Ingresos|Por_dia|Morocidad|Ingresos_Escalonado|Por_dia_Escalonado|
+----------+--------+--------+-------+---------+-------------------+------------------+
|         1|    Aldo|   12560|     45|     True|              0.053|               0.0|
|         2| Benzema|   4256

El valor minimo de la columna *Ingresos* es 10345 y el maximo es 52185, estos estan representados correctamente en el escalonamiento teniendo un 0 y 1 respectivamente, de manera similar sucede en la columna *Por_dia*.

#**2. Normalizacion**

Podemos normalizar cada muestra
del dataset usando la normalización vectorial:

$$\widehat{X}_{i,j}=\frac{X_{i,j}}{||X_i||_p}$$

In [116]:
import math
from pyspark.sql.functions import lit
from pyspark.sql.types import FloatType
#Nomazlizar elementos aleatorios 
def normalizar(x):
    #Vector para normalizar
    listaCuadrada=x.map(lambda xi:xi*xi)
    total=listaCuadrada.sum()
    val=math.sqrt(total)
    #Escalonar vector
    lista= x.map(lambda xi :(xi/val))
    return lista 

#Creamos datos
df = spark.createDataFrame([ (1, 'Aldo',12560,45,"True"),
                             (2, 'Benzema',42560,90,"False"),
                             (3, 'Cesar',31285,81,"True"),
                             (4, 'Doris',25285,50,"True"),
                             (5, 'Etson',32285,55,"False"),
                             (6, 'Fausto',18085,61,"True"),
                             (7, 'Gilberto',52185,70,"False"),
                             (8, 'Humberto',10345,46,"False")
                           ], ["id_usuario", "Nombre","Ingresos","Por_dia","Morocidad"])

#Mostrar datos iniciales
print("Data :")
df.show(8)


lista = df.select('Ingresos').rdd.map(lambda row : row[0]).collect()
Vector = sc.parallelize(lista,4)
VectorEscalonada=normalizar(Vector)
a=VectorEscalonada.collect()
df2 = spark.createDataFrame(a, FloatType())
df2=df2.withColumnRenamed("value", "Ingresos_Normalizado")
print("Columna Ingresos Normalizado :")
#Mostrar la columna
df2.show()


Data :
+----------+--------+--------+-------+---------+
|id_usuario|  Nombre|Ingresos|Por_dia|Morocidad|
+----------+--------+--------+-------+---------+
|         1|    Aldo|   12560|     45|     True|
|         2| Benzema|   42560|     90|    False|
|         3|   Cesar|   31285|     81|     True|
|         4|   Doris|   25285|     50|     True|
|         5|   Etson|   32285|     55|    False|
|         6|  Fausto|   18085|     61|     True|
|         7|Gilberto|   52185|     70|    False|
|         8|Humberto|   10345|     46|    False|
+----------+--------+--------+-------+---------+

Columna Ingresos Normalizado :
+--------------------+
|Ingresos_Normalizado|
+--------------------+
|          0.14233384|
|          0.48230317|
|          0.35453135|
|           0.2865375|
|          0.36586368|
|          0.20494485|
|          0.59137666|
|          0.11723276|
+--------------------+



#**3. Binarizacion**
Consiste en transformar las cadenas de texto que tienen valores de string (True y False) a valores numericos, 0 para False y 1 para True

In [None]:
import pyspark.sql.functions as F
from functools import reduce

In [None]:
#Creamos el data a usar
df = spark.createDataFrame([ (1, 'Aldo',12560,45,"True"),
                             (2, 'Benzema',42560,90,"False"),
                             (3, 'Cesar',31285,81,"True"),
                             (4, 'Doris',25285,50,"True"),
                             (5, 'Etson',32285,55,"False"),
                             (6, 'Fausto',18085,61,"True"),
                             (7, 'Gilberto',52185,70,"False"),
                             (8, 'Humberto',10345,46,"False")
                           ], ["id_usuario", "Nombre","Ingresos","Por_dia","Morocidad"])

#Mostramos los datos
print("Data :")
df.show()

#Aqui podemos elegir en que columnas aplicar la binarizacion
cols = ['Morocidad']

#Aplicar la binarizacion al dataframe spark
df_reduced = reduce(lambda df, c: df.withColumn(c, F.when(df[c] == 'False', 0.0).otherwise(1.0)), cols, df)

#Mostrar columna binarizada
print("Data + Columna binarizada:")
df_reduced.show()

Data :
+----------+--------+--------+-------+---------+
|id_usuario|  Nombre|Ingresos|Por_dia|Morocidad|
+----------+--------+--------+-------+---------+
|         1|    Aldo|   12560|     45|     True|
|         2| Benzema|   42560|     90|    False|
|         3|   Cesar|   31285|     81|     True|
|         4|   Doris|   25285|     50|     True|
|         5|   Etson|   32285|     55|    False|
|         6|  Fausto|   18085|     61|     True|
|         7|Gilberto|   52185|     70|    False|
|         8|Humberto|   10345|     46|    False|
+----------+--------+--------+-------+---------+

Data + Columna binarizada:
+----------+--------+--------+-------+---------+
|id_usuario|  Nombre|Ingresos|Por_dia|Morocidad|
+----------+--------+--------+-------+---------+
|         1|    Aldo|   12560|     45|      1.0|
|         2| Benzema|   42560|     90|      0.0|
|         3|   Cesar|   31285|     81|      1.0|
|         4|   Doris|   25285|     50|      1.0|
|         5|   Etson|   32285|    


#**4. Distancia de Minkowski**
Como sabemos la distancia de minkowski es la Distancia entre dos vectores numéricos.

El parámetro p de la métrica de distancia de Minkowski de **`Pyspark`** representa el orden de la norma. Cuando el orden (p) es 1, representará la Distancia de Manhattan y cuando el orden en la fórmula anterior es 2, representará la Distancia euclidiana.

Cuando nuestros objetos están representados en el espacio Minkowski, medimos la similitud entre ellos a través de la *p-Norma* definida por:

$$d(x,y,p) = (\sum_{i=1}^{n}{|x_i - y_i|^p})^{1/p}$$      $$p≥1$$

la norma $p$ siempre puede tomar cualquier valor:


$$d(x,y,p) = (\sum_{i=1}^{n}{|x_i - y_i|^p})$$

$$d(x,y,\infty) = \max(|x_1 - y_1|,|x_2 - y_2|, ..., |x_n - y_n|)^p$$

$$d(x,y,\infty) = \min(|x_1 - y_1|,|x_2 - y_2|, ..., |x_n - y_n|)^p$$

In [None]:
import numpy as np

# Creemos una función rNorm que toma como parámetro y devuelve una función que calcula el pNorm
def pNorm(p):
    def Dist(x,y):
        return np.power(np.power(np.abs(x-y),p).sum(),1/float(p))
    return Dist
# Creemos un RDD con valores numéricos.
np.random.seed(50)
num_p = sc.parallelize(enumerate(np.random.random(size=(10,100))))

In [None]:
num_p

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

Aqui mostramos la formula del minkowski tiene un parecido a la formula euclidian lo que diferencia es su norma ya que el monkwoski puede aceptar valores P y el euclian solo valor 2


In [None]:
#Formula Minswoski
dat_p = num_p.cartesian(num_p)
dato_p = dat_p.map(lambda x: ((x[0][0],x[1][0]), (x[0][1],x[1][1])))
#minwoski el valor p tendra diferentes valores 
p = 5
#p = 6
# p = 7
Minkow = pNorm(p)
dist = dato_p.map(lambda x: (x[0], Minkow(x[1][0],x[1][1])))
soluc = dist.map(lambda x: x[1])
minv, maxv, meanv = soluc.min(), soluc.max(), soluc.mean()
print('minimo valor minkowski: ',minv)
print('maximo valor minkowski: ',maxv)
print('Media de los valores minkowski: ',meanv)

minimo valor minkowski:  0.0
maximo valor minkowski:  1.444514010466728
Media de los valores minkowski:  1.2070128755149965


#**5. Distancia Jaccard**

La distancia de Jaccard ( $I_J$ ) o coeficiente de Jaccard ( $I_J$ ) mide el grado de similitud entre dos conjuntos, sea cual sea el tipo de elementos.

La formulación es la siguiente:

$$ J(x,y) = \frac{\sum_{i=1}^{n}{x_i == y_i} }{\sum_{i=1}^{n}{\max(x_i, y_i}) } $$

In [None]:
#Calcula la distancia Jaccard entre dos vectores binarios.
#x, y (np.array): Matriz de enteros binarios x and y.
#Returns: J (int): La distancia Jaccard entre x and y.
def Jaccard(x,y):
    return (x==y).sum()/float( np.maximum(x,y).sum())

colores = sc.parallelize(enumerate([['rojo', 'negro', 'azul'],
                             ['rosado', 'negro', 'verde'],
                             ['rosado', 'amarillo', 'azul'],
                             ['rosado', 'negro', 'verde'],
                             ['rojo', 'amarillo', 'verde'],
                            ]))
datos = (colores
             .flatMap(lambda x: [((x[0],xi),1) for xi in x[1]])
             .reduceByKey(lambda x,y: x)
             .map(lambda x: x[0])
             )

dato = dict((v,k) for k,v in datos.collect())
ndato = len(dato)
print(dato, ndato)

{'rojo': 4, 'negro': 3, 'rosado': 3, 'verde': 3, 'azul': 2, 'amarillo': 4} 6


In [None]:
#Binarizar el vector categórico usando un diccionario de key
#atributos (lista): Lista de atributos de un objeto dado
def Bina(atributos,dato):  
    array = np.zeros(len(dato))
    for atr in atributos:
        array[ dato[atr] ] = 1
    return array

# Convierta datosa formato binario usando key  dict
binarizar = colores.map(lambda rec: (rec[0],Bina(rec[1], dato)))
binarizar.collect()

[(0, array([0., 0., 1., 1., 1., 0.])),
 (1, array([0., 0., 0., 1., 0., 0.])),
 (2, array([0., 0., 1., 1., 1., 0.])),
 (3, array([0., 0., 0., 1., 0., 0.])),
 (4, array([0., 0., 0., 1., 1., 0.]))]

In [None]:
# Adquirir dentro de los  PySpark para hallar produto cartesiano 
Binario = binarizar.cartesian(binarizar)
# Aplicar un mapa para transformar nuestro RDD en un RDD de tupla ((id1, id2), (vector1, vector2))
# use el comando take (1) e imprima el resultado para verificar el formato RDD actual
Binario_par = Binario.map(lambda x: ((x[0][0],x[1][0]), (x[0][1],x[1][1])))
#Aplicar un mapa para calcular la distancia de Jaccard entre pares
jacRDD = Binario_par.map(lambda x: (x[0], Jaccard(x[1][0],x[1][1])))
#calcular min, max, mean
statJRDD = jacRDD.map(lambda x: x[1])
Jmin, Jmax, Jmean = statJRDD.min(), statJRDD.max(), statJRDD.mean()
print ("\t\tMin\tMax\tMedia")
print ("Jaccard:\t{:.2f}\t{:.2f}\t{:.2f}".format( Jmin, Jmax, Jmean ))

		Min	Max	Mean
Jaccard:	1.33	6.00	2.49
