# Conociendo los RDDs


En este notebook trabajaremos con los RDDs que forma parte del Spark Core.La implementación de Spark Core es un **RDD (Resilient Distributed Dataset)** que es una colección de datos distribuidos en diferentes nodos del clúster que se procesan en paralelo.

Utilizaremos la API de PySpark, pero los conceptos aplican por igual a todas las APIs (Scala, R, etc)

### Inicialización de Spark en Notebooks

In [1]:
import findspark
findspark.init() #para inicializar

import pandas as pd
import pyspark

In [2]:
from pyspark.sql import SparkSession

### Crear el SparkSession y el SparkContext

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_training')\
        .getOrCreate() #devuelve una sesion existente sino existe la crea

In [3]:
spark = SparkSession.builder.getOrCreate() #genera lo mismo que antes solo que con los valores por default
sc = spark.sparkContext

### Crear un RDD de una colección

In [4]:
num = [1,2,3,4,5]

num_rdd = sc.parallelize(num) #funcion para paralelizar
num_rdd.collect() #con collect, recojemos toda la lista de nros

[1, 2, 3, 4, 5]

# Transformaciones
* Como sabemos, las Transformaciones son de naturaleza perezosa y no se ejecutarán hasta que se ejecute una Acción sobre ellas.
* Intentemos comprender las distintas transformaciones disponibles.

Para mas informacion, puedes apoyarte del siguiente link:
* https://keepcoding.io/blog/transformaciones-y-acciones-en-spark/ 


### map
* Esto mapeará su entrada a alguna salida basada en la función especificada en la función 

In [5]:
double_rdd = num_rdd.map(lambda x : x * 2) 
double_rdd.collect()

[2, 4, 6, 8, 10]

### filtro
* Para filtrar los datos en función de una determinada condición. Intentemos encontrar los números pares de num_rdd.

In [6]:
even_rdd = num_rdd.filter(lambda x : x % 2 == 0) #filtro por los elementos pares del rdd
even_rdd.collect()

[2, 4]

### distinct
* Esto devolverá elementos distintos de un RDD.

In [8]:
rdd1 = sc.parallelize([10, 11, 11, 13, 11, 10, 12])
dist_rdd = rdd1.distinct()
dist_rdd.collect()

[10, 11, 12, 13]

### reduceByKey
* Esta función reduce los pares de valores clave en función de las claves y una función determinada dentro de reduceByKey

In [9]:
pairs = [ ("a", 8), ("b", 3), ("c", 3), ("a", 5), ("b", 1), ("c", 4)]
pair_rdd = sc.parallelize(pairs)

output = pair_rdd.reduceByKey(lambda x, y : x + y)

result = output.collect()
print(*result, sep='\n')

('b', 4)
('c', 7)
('a', 13)


### sortByKey
* Esta función realizará la clasificación en un par (clave, valor) RDD basado en las claves. De forma predeterminada, la clasificación se realizará en orden ascendente.

In [11]:
pairs = [ ("a", 5), ("d", 7), ("c", 2), ("b", 3)]
raw_rdd = sc.parallelize(pairs)

sortkey_rdd = raw_rdd.sortByKey() #ascending=False
result = sortkey_rdd.collect()
print(*result,sep='\n')

# Para clasificar en orden descendente, pase  “ascending=False”.

('a', 5)
('b', 3)
('c', 2)
('d', 7)


# Acciones

* Las acciones son operaciones en RDD que se ejecutan inmediatamente. Mientras que las transformaciones devuelven otro RDD, las acciones devuelven estructuras de datos nativas 

### count
* Esto contará el número de elementos en el RDD dado.

In [12]:
num = sc.parallelize([1,2,4,5,2])
num.count()

5

### first
* Esto devolverá el primer elemento del RDD dado.

In [13]:
num.first()

1

### Collect
* Esto devolverá todos los elementos para el RDD dado.


In [14]:
num.collect()

[1, 2, 4, 5, 2]

### Take
* Esto devolverá el número de elementos especificados.

In [15]:
num.take(3)

[1, 2, 4]