# Clasificador K-NN en Spark usando pyspark.dataframes

### Se importan las librerías necesarias

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import MaxAbsScaler

import random
import math
import time
import numpy as np 

### Se crea la sesión y config. de Spark

In [2]:
conf = (SparkConf()
        .setAppName("Data exploration URL - KNN Spark RDD") \
        .set('spark.driver.cores', '6') \
        .set('spark.executor.cores', '6') \
        .set('spark.driver.memory', '6G') \
        .set('spark.master', 'local[6]') \
        .set('spark.sql.autoBroadcastJoinThreshold', '-1') \
        .set('spark.executor.memory', '6G'))
sc = SparkContext(conf=conf)

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
sc._conf.getAll()

[('spark.app.id', 'local-1619111166839'),
 ('spark.app.startTime', '1619111164576'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '6G'),
 ('spark.master', 'local[6]'),
 ('spark.driver.cores', '6'),
 ('spark.executor.memory', '6G'),
 ('spark.executor.cores', '6'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.driver.port', '46055'),
 ('spark.submit.deployMode', 'client'),
 ('spark.sql.autoBroadcastJoinThreshold', '-1'),
 ('spark.app.name', 'Data exploration URL - KNN Spark RDD'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.host', 'fedora')]

In [5]:
sc

### Función para calcular el tiempo de ejecución

In [6]:
def tiempo(start, end):
    medida = 'segundos'
    tiempo = end - start
    if (tiempo >= 60):
        tiempo = tiempo / 60
        medida = 'minutos'
    else:
        if (tiempo >= 3600):
            tiempo = tiempo / 3600
            medida = 'horas'
    print("Tiempo de ejecución: ", round(tiempo, 2), medida)

### Calcular la distancia euclideana.
#### Summary:
        Se calcula la distancia entre las columnas de dos renglones de un dataset, funciona
        con argumentos provenientes de un renglón de un dataframe de Spark.
#### Args: 
        row1(numpy.ndarray): Recibe una instancia del dataset
        row2(pyspark.ml.linalg.SparseVector): Recibe una instancia del dataset

In [319]:
def euclidean_distance(row1, row2):
    distance = 0.0
    columns = len(row1[0])
    for column in range(columns):
        distance += pow(row1[0][column] - row2[column], 2)
    distance = math.sqrt(distance)
    return round(distance, 4)

### Obtener los vecinos más cercanos.
#### Summary: 
      Se recorre cada renglón del dataframe dado y se calcula la distancia entre cada 
      uno de estos y el renglón de prueba.
      El RDD "distances", almacenará las distancias calculadas, 
      posteriormente se ordena de modo ascendente y se almancenan los primeros k-elementos 
      en la lista "k_neighbors"

#### Args: 
      train(pyspark.rdd.RDD): Recibe el conjunto de entrenamiento
      test_row(numpy.ndarray): Recibe una instancia del conjunto de test
      k(int): Número de vecinos que se desean obtener

In [355]:
def get_neighbors(train, test_row, k):
    rdd_distances = train.map(lambda element: (element[0], euclidean_distance(test_row, element[1])))
    rdd_distances = rdd_distances.filter(lambda element: element[1] > 0.0)
    k_neighbors = rdd_distances.takeOrdered(k, key= lambda  x: x[1]) 
    return k_neighbors

In [351]:
def sort_neighbors(rdd_distances, k):
    lista = []
    for i in range(k):
        element = rdd_distances.min()
        rddAux = rdd_distances.filter(lambda x: x != element)
        lista.append(element)
    return lista

### Predecir las etiquetas usando k-nn.
#### Summary:
      Se obtiene la lista de los k-vecinos más cercanos, y se almacena el valor de
      la etiqueta en la lista "output_labels". Posteriormente se calcula el valor 
      promedio de las etiquetas y se almacena en la variable "prediction" y se retorna.

#### Args: 
      train(pyspark.rdd.RDD): Recibe el conjunto de entrenamiento
      test_row(numpy.ndarray): Recibe una instancia del conjunto de test
      k(int): Número de vecinos que se desean obtener

In [75]:
def predict_classification(train, test_row, k):
    neighbors = get_neighbors(train, test_row, k)
    output_labels = [row[0] for row in neighbors]
    prediction = max(set(output_labels), key=output_labels.count)
    return prediction

### Clacular el porcentaje de exactitud.
#### Summary:
      Esta función calcula el porcentaje de exactitud del uso de k-NN, comparando
      las etiquetas reales de las instancias del dataset de entrenamiento y las
      etiquetas obtenidas mediante la predicción usando k-NN.
#### Args: 
      real_labels(numpy.ndarray): Recibe el dataframe de test que contiene los
                                                    valores reales de las etiquetas
      predicted(list): Lista con las etiquetas obtenidas mediante K-NN

In [76]:
def accuracy(real_labels, predicted):
    correct = 0
    total_rows = len(real_labels)
    for i in range(total_rows):
        if(real_labels[i] == predicted[i]):
            correct += 1
    print("Correct labels: ", correct, 'of', (total_rows))
    accuracy = correct / float(total_rows)
    return accuracy

### Crear la función que calcule los vecinos más cercanos.
#### Summary:
      Se asignan los parámetros para calcular los k-vecinos más cercanos y hacer predicciones
      de las etiquetas a las que pertenecen, calculando la distancia entre las columnas de cada
      uno de los renglones del dataframe de "test" y el de "train", comparando las 
      reales con las otenidas por el clasificador y, finalmente, dado el porcentaje de exactitud obtenido. 
#### Args: 
      train(pyspark.rdd.RDD): Recibe el conjunto de entrenamiento
      test(pyspark.rdd.RDD): Recibe el conjunto de test
      k(int): Número de vecinos que se desean obtener

In [77]:
def k_nearest_neighbors(train, test, k):
    predictions = []
    total_test_rows = test.count()
    for index in range(total_test_rows):
        test_row = np.array(test.zipWithIndex().filter(lambda element: element[1] == index).map(lambda element: element[0][1]).collect(), dtype = object)
        output = predict_classification(train, test_row, k)
        predictions.append(output)
    labels_array = np.array(test.map(lambda x: x[0]).collect(), dtype = float)
    mean_accuracy = accuracy(labels_array, predictions)
    print("Mean accuracy: " + str(mean_accuracy))

## Se cargan los datos al dataframe 

In [404]:
# Load training data
data = spark.read.format("libsvm")\
    .option("header", "false")\
    .option("inferSchema","true")\
    .load("/home/jsarabia/Documents/IA/datasets/Data-exploration/807990_x_instances_30.svm")
    # .load("../data/url_svmlight/Dimension_100_x_1000.svm")

In [405]:
data.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



### Normalización

In [406]:
scaler = MaxAbsScaler(inputCol="features", outputCol="features_norm")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(data)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(data)

scaledData = scaledData.drop("features")

In [409]:
#Dividir los datos en conjunto de train y de test
seed = 1234
splits = scaledData.randomSplit([0.7, 0.3], seed)

train = splits[0]
test = splits[1]

# Se asignan los RDD para el posterior procesamiento
rdd_train = train.rdd
rdd_test = test.rdd
# rdd_total = data.rdd

scaledData.head(1)

[Row(label=1.0, features_norm=SparseVector(763908, {1: 1.0, 3: 0.1923, 4: 0.3043, 5: 0.4, 10: 1.0, 16: 0.9776, 17: 0.9894, 18: 0.1567, 20: 0.0496, 21: 0.1488, 22: 0.1488, 23: 1.0, 24: 1.0, 35: 1.0, 36: 1.0, 43: 1.0, 44: 1.0, 47: 1.0, 49: 1.0, 53: 1.0, 55: 1.0, 63: 1.0, 69: 1.0, 71: 1.0, 73: 1.0, 75: 1.0, 83: 1.0, 89: 1.0, 91: 1.0, 93: 1.0, 95: 1.0, 103: 1.0, 109: 1.0, 111: 1.0, 130: 1.0, 132: 1.0, 140: 1.0, 146: 1.0, 148: 1.0, 287: 1.0, 331: 1.0, 332: 1.0, 333: 1.0, 338: 1.0, 339: 1.0, 340: 1.0, 359: 1.0, 360: 1.0, 385: 1.0, 387: 1.0, 389: 1.0, 593: 1.0, 604: 1.0, 695: 1.0, 729: 1.0, 763: 1.0, 764: 1.0, 765: 1.0, 841: 1.0, 1678: 1.0, 1739: 1.0, 1740: 1.0, 1741: 1.0, 4291: 1.0, 8340: 1.0, 34065: 1.0, 34066: 1.0, 34067: 1.0, 34068: 1.0, 47172: 1.0, 82032: 1.0, 82033: 1.0, 82034: 1.0, 82035: 1.0, 90693: 1.0, 90694: 1.0, 155152: 1.0, 155153: 1.0, 155154: 1.0, 155155: 1.0, 155156: 1.0, 155157: 1.0, 155158: 1.0, 155159: 1.0, 155162: 1.0, 155163: 1.0, 155164: 1.0, 155165: 1.0, 155166: 1.0, 15

In [410]:
rdd_test.count()

18

## Se invoca al método y se envían los parámetros

In [412]:
start_time = time.time()
k_nearest_neighbors(rdd_train, rdd_test, k = 5)
end_time = time.time()
print(tiempo(start_time, end_time))

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/home/jsarabia/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3437, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-413-ecb90ce6a725>", line 2, in <module>
    k_nearest_neighbors(rdd_train, rdd_test, k = 5)
  File "<ipython-input-77-d5b66bd7d35b>", line 5, in k_nearest_neighbors
    test_row = np.array(test.zipWithIndex().filter(lambda element: element[1] == index).map(lambda element: element[0][1]).collect(), dtype = object)
  File "/opt/spark/python/pyspark/ml/linalg/__init__.py", line 735, in __getitem__
    insert_index = np.searchsorted(inds, index)
  File "<__array_function__ internals>", line 2, in searchsorted
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jsarabia/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 2061, in showtraceback
    stb =


KeyboardInterrupt



## Prueba de cada método de K-NN  con el archivo Dimensión 5 x 76

In [95]:
# Se asignan los RDD para el posterior procesamiento
rdd_train = train.rdd
rdd_test = test.rdd
rdd_total = data.rdd

In [96]:
# Se agrega un índice a las instancias para poder recorrerlas posteriormente mediante un filtro.
rdd_index = rdd_total.zipWithIndex()
# Se selecciona solo la columna que contiene los valores de las características.
rdd_columns = rdd_total.map(lambda x: x[1])

In [97]:
# Se prueba el método de distancia euclideana con RDD
# Renglón no. 1
rdd_row1 = rdd_index.filter(lambda x: x[1] == 0)
# Se transforma en un array de Numpy solo con los valores de las columnas
row1 = np.array(rdd_row1.map(lambda element: element[0][1]).collect(), dtype = object)
# Las distancias se almacenan en un RDD
rdd_distances = rdd_columns.map(lambda x: euclidean_distance(row1, x))
start_time = time.time()
rdd_distances.collect()
end_time = time.time()
print(tiempo(start_time,end_time))

Tiempo de ejecución:  0.08 segundos
None


In [103]:
# Prueba de la función get_neighbors()
# Renglón no. 1
rdd_row1 = rdd_index.filter(lambda x: x[1] == 0)
# Se transforma en un array de Numpy solo con los valores de las columnas
row1 = np.array(rdd_row1.map(lambda element: element[0][1]).collect(), dtype = object)
start_time = time.time()
print(get_neighbors(rdd_total, row1, k = 5))
end_time = time.time()
print(tiempo(start_time,end_time))

8
8
8
8
8
0
Tiempo de ejecución:  0.69 segundos
None


In [120]:
# Prueba de la función predict_classification()
# Renglón no. 1
rdd_row1 = rdd_index.filter(lambda x: x[1] == 0)
# Se transforma en un array de Numpy solo con los valores de las columnas
row1 = np.array(rdd_row1.map(lambda element: element[0][1]).collect(), dtype = object)
start_time = time.time()
prediction = predict_classification(rdd_total, row1, 3)
print('Expected label: %d, Got: %d.' % (rdd_row1.take(1)[0][0][0], prediction))
end_time = time.time()
print(tiempo(start_time,end_time))

Expected label: 0, Got: 0.
Tiempo de ejecución:  0.6830856800079346
None


In [1]:
sc.stop()

NameError: name 'sc' is not defined

In [57]:
rdd = sc.parallelize(range(1,11))

In [58]:
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [59]:
rdd.count()

10

In [60]:
rdd.min()

1

In [61]:
rdd = rdd.filter(lambda x: x != 3)

In [62]:
rdd.count()

9

In [64]:
rdd.collect()

[1, 2, 4, 5, 6, 7, 8, 9, 10]

In [65]:
rdd = rdd.filter(lambda x: x != 9)

In [66]:
rdd.collect()

[1, 2, 4, 5, 6, 7, 8, 10]

In [67]:
rdd = rdd.filter(lambda x: x != 1)

In [70]:
rdd.collect()

[2, 4, 5, 6, 7, 8, 10]

In [71]:
rdd_vecinos = sc.parallelize([])

### Prueba actual

In [323]:
rdd_vecinos_prb = rdd_vecinos

In [329]:
rdd_vecinos_prb.collect()

[(0.0, 2.2022),
 (1.0, 4.1898),
 (1.0, 3.8775),
 (1.0, 3.7552),
 (1.0, 3.7552),
 (1.0, 3.2155)]

In [330]:
element = rdd_vecinos_prb.min()

In [331]:
element

(0.0, 2.2022)

In [332]:
rdd_vecinos_prb = rdd_vecinos_prb.filter(lambda x: x != element)

In [333]:
rdd_vecinos_prb.collect()

[(0.0, 1.6999),
 (1.0, 4.1898),
 (1.0, 3.8775),
 (1.0, 3.7552),
 (1.0, 3.7552),
 (1.0, 3.2155)]

In [413]:
sc.stop()

ERROR! Session/line number was not unique in database. History logging moved to new session 619
