#**Area: Mineria de Datos**

#**Procesamiento con PySpark**

#**Alumno: Huillca Herrera Victor Pool**

#**Codigo: 163845**

#Libreria necesaria para la ejecucion del programa

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 70.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=674d16e7c679a1df6788269a392fcbb8042fe402fe60524e64fdf53e3c2c9245
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


Permitamos la manipulacion y creacion de RDDs

In [None]:
from pyspark import SparkContext
sc =SparkContext() 

#1.-Distancia de Minkowski

In [None]:
def subtraction_p(tupla,p):
  return abs(tupla[0]-tupla[1])**p

In [None]:
# Calcular la distancia de Minkowski
def distanceMinkowskiSpark(v1,v2,p,paticiones=4):
  # Creamos tuplas para cada i de los vectores (v1,v2)
  v3=list(zip( v1, v2)) 
  # Creamos el RDD  
  coordenadasRDD=sc.parallelize(v3,paticiones)  
  # Calculamos todos los |xi-yi|^p
  subtraction_p_RDD=coordenadasRDD.map(lambda x: subtraction_p(x,p))
  # Calculamos la sumatoria
  sumRDD=subtraction_p_RDD.sum()               
  distance=sumRDD**(1/p)                        
  return distance

# Ejemplo con la Distancia de Minkowski

In [None]:
v1=list(range(2,5))
v2=list(range(6,9))
p=2
# Usando Spark 
print("La distancia es:", distanceMinkowskiSpark(v1,v2,p))
# Usando la libreria Scipy
from scipy import spatial
d=spatial.distance.minkowski(v1, v2, p=p, w=None)
print("La distancia es :", d)

La distancia es: 6.928203230275509
La distancia es : 6.928203230275509


#2.-Normalización

In [None]:
#Normalizar un vector
def normalize(vector, particiones=4):

  # Creamos el RDD
  vectorRDD=sc.parallelize(vector,particiones)
  # Calculamos el módulo del vector
  moduloVectorRDD= (vectorRDD.map(lambda x:x**2).sum())**(1/2)
  # Normalizamos
  normalizeRDD=vectorRDD.map(lambda x : x/moduloVectorRDD )
  vector_normalizado=normalizeRDD.collect()
  return vector_normalizado

#Ejemplo de Normalizacion

In [None]:
v1=range(2,7)
normalize(v1,5)

[0.21081851067789195,
 0.31622776601683794,
 0.4216370213557839,
 0.5270462766947299,
 0.6324555320336759]

#3.-Similitud de Cosenos

In [None]:
# Calculamos el producto
def internalProduct(tupla):
  return tupla[0]*tupla[1]

In [None]:
# Calculo de la similitud de cosenos entre 2 vectores
def cosineSimilarity(v1,v2, particiones=4):
  
  v3=list(zip( v1, v2))
  # Creamos los RDD
  RDD_1=sc.parallelize(v1,particiones)
  RDD_2=sc.parallelize(v2,particiones)
  RDD_3=sc.parallelize(v3,particiones)
  # Calculamos el producto interno
  sumInternalProductRDD=RDD_3.map(lambda x: internalProduct(x)).sum()
  # Calculamos la norma de v1
  normV1RDD= RDD_1.map(lambda x:x**2).sum()
  # Calculamos la norma de v2
  normV2RDD= RDD_2.map(lambda x:x**2).sum()
  # calculo final
  Similarity=(sumInternalProductRDD) / ( normV1RDD**(1/2) * normV2RDD**(1/2) )
  return Similarity

#Ejemplo del calculo de Similitud de Cosenos

In [None]:
v1=list(range(2,5))
v2=list(range(6,9))
# Usando Spark 
print("El valor de la similitud de cosenos es :", cosineSimilarity(v1,v2))
# Usando la libreria Scipy
from scipy import spatial
result = 1 - spatial.distance.cosine(v1, v2)
print("El valor de la similitud de cosenos es :", result)

El valor de la similitud de cosenos es : 0.9888290654204228
El valor de la similitud de cosenos es : 0.9888290654204228


#4.-Escalonamiento

In [11]:
#Escalonamiento de un vector
def escalonamiento(vector, particiones=4):

  # Creamos el RDD
  vectorRDD=sc.parallelize(vector,particiones) 
  # Hallamos el valor minimo
  minVector=vectorRDD.min() 
  # Hallamos el valor máximo                    
  maxVector=vectorRDD.max()                     
  diferencia=maxVector-minVector                
  escalomaniemtoRDD= vectorRDD.map(lambda x : (x - maxVector)/ diferencia )
  # Recuperamos los valores
  vectorEscalonado=escalomaniemtoRDD.collect() 
  return vectorEscalonado 

#Ejemplo de Escalonamiento

In [12]:
v1 = range(1,6)
escalonamiento(v1,5)

[-1.0, -0.75, -0.5, -0.25, 0.0]

#5.-Estandarización

In [13]:
#Estandarizacion de un vector
def estandarizar(vector, particiones=4):
  
  # Creamos el RDD
  vectorRDD=sc.parallelize(vector,particiones)
  # Hallamos la media del vector
  meanVector=vectorRDD.mean() 
  # Hallamos la desviacion estandar                  
  dsVector=vectorRDD.stdev()                   
  estandarizacionRDD= vectorRDD.map(lambda x : (x - meanVector)/ dsVector )
  # Recuperamos los valores
  vectorEstandarizado=estandarizacionRDD.collect()  
  return vectorEstandarizado

#Ejemplo de Estandarizacion

In [14]:
v1 = range(2,7)
estandarizar(v1,5)

[-1.414213562373095,
 -0.7071067811865475,
 0.0,
 0.7071067811865475,
 1.414213562373095]