# **ALGORITMOS DE PREPROCESAMIENTO EN PYSPARK**



## 1. Instalar pyspark

In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=e28df31fbb6ca64a3e7f9d62b2eb7f9de482c6cd2558de8b6a64dfaa52a789a7
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


## 2. Guardar la libería en una variable con SparkContext

In [2]:
from pyspark import SparkContext
sc = SparkContext()

## 3. Algoritmo Similitud de Cosenos

In [None]:
#Crear 1er vector 
vector1 = [1, 1, 1, 1, 0, 1, 1, 0, 0, 1] 
#Crear 2do vector
vector2 = [0, 1, 1, 1, 1, 0, 1, 1, 0, 1] 
#Unir vectores
vector3 = list(zip(vector1,vector2))

In [None]:
#Crear RDD para cada vector y paralelizar
cosenos1RDD = sc.parallelize(vector1)
cosenos2RDD = sc.parallelize(vector2)
cosenos3RDD = sc.parallelize(vector3)

In [None]:
#Función RDD
# Aplicar una funcion map multiplicar ambos valores y sumar, luego elevar al cuadrado estos valores para sacer raíz y dividir la función
respuesta = cosenos3RDD.map(lambda x: x[0]*x[1]).sum() / (cosenos1RDD.map(lambda x:x**2).sum()**(1/2) * cosenos2RDD.map(lambda x:x**2).sum()**(1/2))
print ("La similud es: ",respuesta)

La similud es:  0.7142857142857142


## 4. Algoritmo de escalamiento de datos

In [None]:
x = [55,6,17,28,19,4,16,15,12,18,52,14,22,7,4,30,51,5]

In [None]:
#Crear RDD
xRDD = sc.parallelize(x)

In [None]:
#Función RDD
#Hallar el valor maximo
minRDD = xRDD.max()
#Hallar el valor minimo
maxRDD = xRDD.min()
#Hallar el escalamiento
scaleRDD = xRDD.map(lambda x: (x - minRDD) / (maxRDD - minRDD))
#Mostrar datos
scaleRDD.collect()

[-0.0,
 0.9607843137254902,
 0.7450980392156863,
 0.5294117647058824,
 0.7058823529411765,
 1.0,
 0.7647058823529411,
 0.7843137254901961,
 0.8431372549019608,
 0.7254901960784313,
 0.058823529411764705,
 0.803921568627451,
 0.6470588235294118,
 0.9411764705882353,
 1.0,
 0.49019607843137253,
 0.0784313725490196,
 0.9803921568627451]

## 5. Algoritmo distancia de Mikowski

In [41]:
#Creamos el primer vector 
vector1 = [1,2,3,5,6,7,8,9,10,11] 
#Creamos el segundo vector
vector2 = [3,2,1,0,10,9,8,7,6,5] 
#Inicializar p
p = 3

In [42]:
#Crear un vector 3 con una tuplauna tupla la cual se creara con un zip, 
#en la cual se unirán los 2 vectores
vector3 = list(zip(vector1,vector2))

In [43]:
#Función RDD
mikowskiRDD = sc.parallelize(vector3)

In [44]:
#Aplicar función
#Crear una función lambda donde se utilizarán las tuplas para restar el 1er vector menos el segundo
#Luego sumaremos los valores y sacar la raíz
mikowskiRDD.map(lambda x: abs(x[0] - x[1])**p).sum()**(1.0/p)

7.942293073037288

## 6. Algoritmo distancia de Jaccard

In [37]:
x = [6,8,4,2,11,14,1,3,7,8]
y = [1,10,3,9,16,18,21,12,13]

In [38]:
#Crear un vector donde se tendrá una tupla la cual con un zip donde se uni´ran ambos vectores
z = list(zip(x,y))

In [39]:
#Crear RDD
zRDD = sc.parallelize(z)

In [40]:
#Función RDD
#Crear una variable sumando los productos 
sumProdRDD = zRDD.map(lambda x: x[0]*x[1]).sum()
#Usar la función map
sumSumRDD = zRDD.map(lambda x: 1 if (x[0]==1 or x[1]==1) else 0 ).sum()
#Restar 1 a la divisipon de las variables 
jaccardRDD = 1 - (sumProdRDD / sumSumRDD)
print("La distancia de Jaccard es: ",jaccardRDD)

La distancia de Jaccard es:  -345.0


##7. Algoritmo de normalizacion

In [21]:
# Importar librería math
import math
x = [6,9,65,32,12,1,5,6,22,24,60]

In [22]:
#Crear RDD
xRDD = sc.parallelize(x)

In [23]:
# Función RDD
# Elevar al cuadrado los datos, sumar los valores luego sacar la raíz cuadrada con la función map
nxRDD = (xRDD.map(lambda x: x**2).sum()) ** (1/2)
# Dividir a los datos iniciales
normRDD = xRDD.map(lambda x: x/nxRDD)
#Mostrar resultados
normRDD.collect()

[0.0593158809213532,
 0.0889738213820298,
 0.6425887099813263,
 0.31635136491388377,
 0.1186317618427064,
 0.009885980153558868,
 0.049429900767794334,
 0.0593158809213532,
 0.2174915633782951,
 0.2372635236854128,
 0.593158809213532]