# Algoritmos para Preprocesamiento

## Llamada a PYSPARK

In [58]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.getOrCreate()
sc = SparkContext.getOrCreate()
import math


## 1. Normalización

In [59]:
def nor(x):
    """Normalizacion los elementos de un RDD
    Args:
        X (RDD):arreglo numeros
    Returns:
        RDD: normalizado
    """
    xRDD = sc.parallelize(x, 5)
    listaCuadrada=xRDD.map(lambda xi:xi*xi)
    total=listaCuadrada.sum()
    val=math.sqrt(total)
    #escalonamos cada elemento com map
    normalizado= xRDD.map(lambda xi :(xi/val))
    return normalizado

Calculamos normalizacion

In [60]:
Array=[12,6,87,45,121,6148]
Array_normalizado=nor(Array).collect()

Imprimimos resultados

In [61]:
print("Arreglo inicial")
print(Array)
print("Arreglo Normalizado")
print(Array_normalizado)

Arreglo inicial
[12, 6, 87, 45, 121, 6148]
Arreglo Normalizado
[0.0019512241810033615, 0.0009756120905016807, 0.01414637531227437, 0.007317090678762605, 0.01967484382511723, 0.9996771887340555]


# 2. Estandarizacion

In [62]:
def estandarizacion(x):
    """
    Estandariza los elementos de un arreglo
    Args:
        x (RDD): arreglo de números
        
    Returns:
        RDD : estandarizado
    """
    xRDD = sc.parallelize(x, 5)
    # obtenemos la media
    media = xRDD.mean()
    # obtenemos desviacion estandar
    desv = xRDD.stdev()
    # Estandarizamos con.map
    estandarizado = xRDD.map(lambda x : (x-media)/desv)
    return estandarizado

Calculamos estandarizacion

In [63]:
Array=[12,6,87,45,121,6148]
Array_estandarizado=estandarizacion(Array).collect()

Imprimimos resultados

In [64]:
print("Arreglo inicial")
print(Array)
print("Arreglo Estandarizado")
print(Array_estandarizado)

Arreglo inicial
[12, 6, 87, 45, 121, 6148]
Arreglo Estandarizado
[-0.46572232429077726, -0.4683638878128299, -0.4327027802651195, -0.45119372491948784, -0.4177339203068213, 2.235716637595036]


# 3. Escalonamiento

In [65]:
def escalonamiento(x):
    """Escalonar los elementos de un RDD
    
    Args:
        X (RDD numeric):array RDD numeric
    Returns:
        RDD: Vector escalonado
    """
    xRDD = sc.parallelize(x, 5)
    # obtenemos minimo
    min = xRDD.min()
    # obtenemos maximo
    max = xRDD.max()
    #escalonamos com .map
    lista= xRDD.map(lambda x : (x-min)/(max-min))
    return lista



Calculamos escalonamiento

In [66]:
Array=[12,6,87,45,121,6148]
array_escalonado = escalonamiento(Array).collect()

Imprimimos resultados

In [67]:
print("Array inicial")
print(Array)
print("Array escalonado")
print (array_escalonado)

Array inicial
[12, 6, 87, 45, 121, 6148]
Array escalonado
[0.0009768804949527841, 0.0, 0.013187886681862585, 0.006349723217193097, 0.01872354281992836, 1.0]


# 4. TF

In [68]:

def tf(data):
    """
    Determinar la frecuencia de términos de un documentos (TF)

    Args:
        doc (str): Un string.

    Returns:
        RDD: (palabra, frecuencia).
    """
    Doc = data.split()
    cantidad = len(Doc)
    # Volvemos este arreglo de tuplas en un RDD de 2 particiones
    terminosRDD = sc.parallelize(Doc,2)
    # calculamos ocurrencia de terminos en base a los que los contengan
    tfRDD = (terminosRDD.map(lambda x: (x,1))
           .reduceByKey(lambda x,y: x+y).map(lambda x: (x[0],1+math.log10(x[1]/cantidad))))
    return tfRDD.collect()

Realziamos prueba y mostramos resultados

In [69]:
# Pruebas tomando algunos ejemplos de twitts.txt
oracion1="music library sync new music etc itunes tell sync complete new uploaded fix frustrating"
oracion2="apple make bad charger genius say wrap electrical tape wonder jony think"
oracion3="amp delete music customer ipod tell"
oracion4="apple inc aapl price target eps estimate raise bmo"

print("=========== TEST TF =============")
print(tf(oracion1))
print("=================================")
print(tf(oracion2))
print("=================================")
print(tf(oracion3))
print("=================================")
print(tf(oracion4))

[('music', 0.1549019599857432), ('library', -0.14612803567823796), ('sync', 0.1549019599857432), ('new', 0.1549019599857432), ('itunes', -0.14612803567823796), ('tell', -0.14612803567823796), ('uploaded', -0.14612803567823796), ('fix', -0.14612803567823796), ('etc', -0.14612803567823796), ('complete', -0.14612803567823796), ('frustrating', -0.14612803567823796)]
[('make', -0.07918124604762489), ('genius', -0.07918124604762489), ('say', -0.07918124604762489), ('electrical', -0.07918124604762489), ('wonder', -0.07918124604762489), ('jony', -0.07918124604762489), ('think', -0.07918124604762489), ('apple', -0.07918124604762489), ('bad', -0.07918124604762489), ('charger', -0.07918124604762489), ('wrap', -0.07918124604762489), ('tape', -0.07918124604762489)]
[('delete', 0.22184874961635637), ('music', 0.22184874961635637), ('customer', 0.22184874961635637), ('tell', 0.22184874961635637), ('amp', 0.22184874961635637), ('ipod', 0.22184874961635637)]
[('inc', 0.04575749056067513), ('aapl', 0.04

# 5. IDF

In [70]:
def idf(data):
    """
    Determinar la frecuencia inversa de documento (IDF)
    
    Args:
        data (List of str): Una lista compuesta por strings.
    
    Returns:
        RDD:(palabra, frecuencia inversa)
    """
    cantidad = len(data)
    # Volvemos este arreglo de tuplas en un RDD de 2 particiones
    corpusRDD = sc.parallelize(data,2)
    # calculamos numero de documentos donde aparece terminos en base a la cantidad documentos
    idfRDD = (corpusRDD.flatMap(lambda x: list(set(x.split(" ")))).map(lambda x:(x,1))
            .reduceByKey(lambda x,y:x+y).map(lambda x: (x[0],math.log10(1+cantidad/x[1]))))
    return (idfRDD)

Realziamos prueba y mostramos resultados

In [71]:
# Pruebas tomando algunos ejemplos de twitts.txt
oracion1="music library sync new music etc itunes tell sync complete new uploaded fix frustrating"
oracion2="apple make bad charger genius say wrap electrical tape wonder jony think"
oracion3="amp delete music customer ipod tell"
oracion4="apple inc aapl price target eps estimate raise bmo"

print("=========== TEST IDF =============")
print(idf([oracion1]).collect())
print("=================================")
print(idf([oracion2]).collect())
print("=================================")
print(idf([oracion3]).collect())
print("=================================")
print(idf([oracion4]).collect())

[('new', 0.3010299956639812), ('itunes', 0.3010299956639812), ('sync', 0.3010299956639812), ('uploaded', 0.3010299956639812), ('music', 0.3010299956639812), ('tell', 0.3010299956639812), ('library', 0.3010299956639812), ('fix', 0.3010299956639812), ('etc', 0.3010299956639812), ('frustrating', 0.3010299956639812), ('complete', 0.3010299956639812)]
[('electrical', 0.3010299956639812), ('wonder', 0.3010299956639812), ('think', 0.3010299956639812), ('genius', 0.3010299956639812), ('make', 0.3010299956639812), ('jony', 0.3010299956639812), ('say', 0.3010299956639812), ('tape', 0.3010299956639812), ('charger', 0.3010299956639812), ('bad', 0.3010299956639812), ('wrap', 0.3010299956639812), ('apple', 0.3010299956639812)]
[('customer', 0.3010299956639812), ('delete', 0.3010299956639812), ('music', 0.3010299956639812), ('tell', 0.3010299956639812), ('amp', 0.3010299956639812), ('ipod', 0.3010299956639812)]
[('bmo', 0.3010299956639812), ('aapl', 0.3010299956639812), ('price', 0.3010299956639812),