# **Mineria de Datos**
# **Laboratorio de Preprocesamiento**
-----------------------------------
### **Nombres:** Antony Isaac Huaman Hermoza
### **Código:** 170434
-----------------------------------

## **Instalamos PySpark**

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 30 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612246 sha256=50a7daacc6d864a74ddca0d382819adaab044f2c1f844640ff67536692e3508b
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


## **Importamos librerias**

In [2]:
##############################
#Para PySpark en general
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
##############################
#Complementarias
import pyspark.sql.functions as F
from functools import reduce
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import math
##############################
#Liberias utilizadas para el escalonamiento
import IPython
from pyspark import SparkConf 
from pyspark.context import SparkContext 
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
##############################
#Liberias utilizadas para normalización
import math
from pyspark.sql.functions import lit
from pyspark.sql.types import FloatType
###############################
#Liberias utilizadas para binarización
import pyspark.sql.functions as F
from functools import reduce
###############################
#Liberias utilizadas para Distancia de Minkowski
import numpy as np
###############################
#Liberias utilizadas para Distancia Jaccard utiliza las anteriores

## **1. Escalonamiento**

In [4]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [8]:
#Creamos datos
df = spark.createDataFrame([ (1, 'Messi',12560,45,"True"),
                             (2, 'James',42560,90,"False"),
                             (3, 'Oliver',31285,81,"True"),
                             (4, 'Isaac',25285,50,"True"),
                             (5, 'Irving',32285,55,"False"),
                             (6, 'Franco',18085,61,"True"),
                             (7, 'Leonel',52185,70,"False"),
                           ], ["id_usuario", "Nombre","Ingresos","Por_dia","Morocidad"])

#Mostrar datos iniciales
print("Data :")
df.show(7)

#UDF para convertir el tipo de columna de vector a tipo doble
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

#Iterando sobre columnas para el escalonamiento
for i in ["Ingresos","Por_dia"]:
    #Transformación con VectorAssembler (conversión de columna a tipo vector)
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    #Transformación MinMaxScaler
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Escalonado")

    #Canalización de VectorAssembler y MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    #Ajuste del pipeline en el marco de datos
    df = pipeline.fit(df).transform(df).withColumn(i+"_Escalonado", unlist(i+"_Escalonado")).drop(i+"_Vect")

print("Data + Escalonamiento :")
df.show(7)

Data :
+----------+------+--------+-------+---------+
|id_usuario|Nombre|Ingresos|Por_dia|Morocidad|
+----------+------+--------+-------+---------+
|         1| Messi|   12560|     45|     True|
|         2| James|   42560|     90|    False|
|         3|Oliver|   31285|     81|     True|
|         4| Isaac|   25285|     50|     True|
|         5|Irving|   32285|     55|    False|
|         6|Franco|   18085|     61|     True|
|         7|Leonel|   52185|     70|    False|
+----------+------+--------+-------+---------+

Data + Escalonamiento :
+----------+------+--------+-------+---------+-------------------+------------------+
|id_usuario|Nombre|Ingresos|Por_dia|Morocidad|Ingresos_Escalonado|Por_dia_Escalonado|
+----------+------+--------+-------+---------+-------------------+------------------+
|         1| Messi|   12560|     45|     True|                0.0|               0.0|
|         2| James|   42560|     90|    False|              0.757|               1.0|
|         3|Oliver|  

Podemos observar que el ingreso mínimos es de 12560 y el máximo 52185.

## **2. Normalizacion**
Podemos normalizar nuestro dataset:

In [9]:
#Nomazlizar elementos aleatorios 
def normalizar(x):
    #Vector para normalizar
    listaCuadrada=x.map(lambda xi:xi*xi)
    total=listaCuadrada.sum()
    val=math.sqrt(total)
    #Escalonar vector
    lista= x.map(lambda xi :(xi/val))
    return lista 

#Creamos datos
df = spark.createDataFrame([ (1, 'Messi',12560,45,"True"),
                             (2, 'James',42560,90,"False"),
                             (3, 'Oliver',31285,81,"True"),
                             (4, 'Isaac',25285,50,"True"),
                             (5, 'Irving',32285,55,"False"),
                             (6, 'Franco',18085,61,"True"),
                             (7, 'Leonel',52185,70,"False"),
                           ], ["id_usuario", "Nombre","Ingresos","Por_dia","Morocidad"])

#Mostrar datos iniciales
print("Data :")
df.show(7)
#############################################################
lista = df.select('Ingresos').rdd.map(lambda row : row[0]).collect()
Vector = sc.parallelize(lista,4)
VectorEscalonada=normalizar(Vector)
a=VectorEscalonada.collect()
df2 = spark.createDataFrame(a, FloatType())
df2=df2.withColumnRenamed("value", "Ingresos_Normalizado")
print("Columna Ingresos Normalizado :")
#Mostrar la columna
df2.show()

Data :
+----------+------+--------+-------+---------+
|id_usuario|Nombre|Ingresos|Por_dia|Morocidad|
+----------+------+--------+-------+---------+
|         1| Messi|   12560|     45|     True|
|         2| James|   42560|     90|    False|
|         3|Oliver|   31285|     81|     True|
|         4| Isaac|   25285|     50|     True|
|         5|Irving|   32285|     55|    False|
|         6|Franco|   18085|     61|     True|
|         7|Leonel|   52185|     70|    False|
+----------+------+--------+-------+---------+

Columna Ingresos Normalizado :
+--------------------+
|Ingresos_Normalizado|
+--------------------+
|          0.14332211|
|            0.485652|
|          0.35699302|
|          0.28852704|
|          0.36840403|
|          0.20636787|
|           0.5954828|
+--------------------+



## **3. Binarizacion**

Consiste en transformar las cadenas de texto que tienen valores de string (True y False) a valores numericos, 0 para False y 1 para True

In [13]:
#Creamos los datos
df = spark.createDataFrame([ (1, 'Messi',12560,45,"True"),
                             (2, 'James',42560,90,"False"),
                             (3, 'Oliver',31285,81,"True"),
                             (4, 'Isaac',25285,50,"True"),
                             (5, 'Irving',32285,55,"False"),
                             (6, 'Franco',18085,61,"True"),
                             (7, 'Leonel',52185,70,"False"),
                           ], ["id_usuario", "Nombre","Ingresos","Por_dia","Morocidad"])

#Mostramos los datos
print("Data :")
df.show()

#Aqui podemos elegir en que columnas aplicar la binarizacion
cols = ['Morocidad']

#Aplicar la binarizacion al dataframe spark
df_reduced = reduce(lambda df, c: df.withColumn(c, F.when(df[c] == 'False', 0.0).otherwise(1.0)), cols, df)

#Mostrar columna binarizada
print("Data + Columna binarizada:")
df_reduced.show()

Data :
+----------+------+--------+-------+---------+
|id_usuario|Nombre|Ingresos|Por_dia|Morocidad|
+----------+------+--------+-------+---------+
|         1| Messi|   12560|     45|     True|
|         2| James|   42560|     90|    False|
|         3|Oliver|   31285|     81|     True|
|         4| Isaac|   25285|     50|     True|
|         5|Irving|   32285|     55|    False|
|         6|Franco|   18085|     61|     True|
|         7|Leonel|   52185|     70|    False|
+----------+------+--------+-------+---------+

Data + Columna binarizada:
+----------+------+--------+-------+---------+
|id_usuario|Nombre|Ingresos|Por_dia|Morocidad|
+----------+------+--------+-------+---------+
|         1| Messi|   12560|     45|      1.0|
|         2| James|   42560|     90|      0.0|
|         3|Oliver|   31285|     81|      1.0|
|         4| Isaac|   25285|     50|      1.0|
|         5|Irving|   32285|     55|      0.0|
|         6|Franco|   18085|     61|      1.0|
|         7|Leonel|   521

## **4. Distancia de Minkowski**

In [14]:
# Creemos una función rNorm que toma como parámetro y devuelve una función que calcula el pNorm
def pNorm(p):
    def Dist(x,y):
        return np.power(np.power(np.abs(x-y),p).sum(),1/float(p))
    return Dist
# Creemos un RDD con valores numéricos.
np.random.seed(50)
num_p = sc.parallelize(enumerate(np.random.random(size=(10,100))))

In [15]:
num_p

ParallelCollectionRDD[148] at readRDDFromFile at PythonRDD.scala:262

In [16]:
#Formula Minswoski
dat_p = num_p.cartesian(num_p)
dato_p = dat_p.map(lambda x: ((x[0][0],x[1][0]), (x[0][1],x[1][1])))
#minwoski el valor p tendra diferentes valores 
p = 5
#p = 6
# p = 7
Minkow = pNorm(p)
dist = dato_p.map(lambda x: (x[0], Minkow(x[1][0],x[1][1])))
soluc = dist.map(lambda x: x[1])
minv, maxv, meanv = soluc.min(), soluc.max(), soluc.mean()
print('minimo valor minkowski: ',minv)
print('maximo valor minkowski: ',maxv)
print('Media de los valores minkowski: ',meanv)

minimo valor minkowski:  0.0
maximo valor minkowski:  1.444514010466728
Media de los valores minkowski:  1.2070128755149965


## **5. Distancia Jaccard**

In [17]:
#Calcula la distancia Jaccard entre dos vectores binarios.

def Jaccard(x,y):
    return (x==y).sum()/float( np.maximum(x,y).sum())

colores = sc.parallelize(enumerate([['Amarillo', 'Rojo', 'Negro'],
                             ['Rojo', 'Celeste', 'Amarillo'],
                             ['Negro', 'Celeste', 'azul'],
                             ['Amarillo', 'Rojo', 'Negro'],
                             ['Negro', 'Amarillo', 'Celeste'],
                            ]))
datos = (colores
             .flatMap(lambda x: [((x[0],xi),1) for xi in x[1]])
             .reduceByKey(lambda x,y: x)
             .map(lambda x: x[0])
             )

dato = dict((v,k) for k,v in datos.collect())
ndato = len(dato)
print(dato, ndato)

{'Amarillo': 3, 'Rojo': 0, 'Celeste': 1, 'Negro': 4, 'azul': 2} 5


In [21]:
#Binarizar el vector categórico usando un diccionario de key
#atributos (lista): Lista de atributos de un objeto dado
def Binarizar(atributos,dato):  
    array = np.zeros(len(dato))
    for atr in atributos:
        array[ dato[atr] ] = 1
    return array

# Convierta datosa formato binario usando key  dict
binarizar = colores.map(lambda rec: (rec[0],Binarizar(rec[1], dato)))
binarizar.collect()

[(0, array([1., 0., 0., 1., 1.])),
 (1, array([1., 1., 0., 1., 0.])),
 (2, array([0., 1., 1., 0., 1.])),
 (3, array([1., 0., 0., 1., 1.])),
 (4, array([0., 1., 0., 1., 1.]))]

In [19]:
# Adquirir dentro de los  PySpark para hallar produto cartesiano 
Binario = binarizar.cartesian(binarizar)
# Aplicar un mapa para transformar nuestro RDD en un RDD de tupla ((id1, id2), (vector1, vector2))
# use el comando take (1) e imprima el resultado para verificar el formato RDD actual
Binario_par = Binario.map(lambda x: ((x[0][0],x[1][0]), (x[0][1],x[1][1])))
#Aplicar un mapa para calcular la distancia de Jaccard entre pares
jacRDD = Binario_par.map(lambda x: (x[0], Jaccard(x[1][0],x[1][1])))
#calcular min, max, mean
statJRDD = jacRDD.map(lambda x: x[1])
Jmin, Jmax, Jmean = statJRDD.min(), statJRDD.max(), statJRDD.mean()
print ("\t\tMin\tMax\tMedia")
print ("Jaccard:\t{:.2f}\t{:.2f}\t{:.2f}".format( Jmin, Jmax, Jmean ))

		Min	Max	Media
Jaccard:	0.20	1.67	0.87
