# Una introducción a Spark con Python

#### Rafael Caballero

Casi todo el código extraído del libro *Big Data con Python*

## 0: Preparación


#### Windows

El mismo código debería funcionar en otros sistemas 

* 
* Instrucciones para linux: https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f
* Instrucciones para windows: https://medium.com/@ashish1512/how-to-setup-apache-spark-pyspark-on-jupyter-ipython-notebook-3330543ab307

En el laboratorio tenemos el problema de que no podemos hacer nuestras propias instalaciones; por ello podemos copiar la carpeta spark hlocal\tdm y ejecutar el siguiente código

In [1]:

# copiar la carpeta spark en c:\hlocal\tdm
import os
# cambiamos las variables del sistema
spark = 'C:\\hlocal\\tdm\\spark\\hadoop\\spark-2.3.2-bin-hadoop2.7'
# en el path se añade
path = os.environ.get('PATH') 
path = path+ ';'+spark+'\\bin;'
os.environ['PATH'] = path
os.environ['SPARK_HOME']= spark 
os.environ['HADOOP_HOME']= spark 
os.environ['PYSPARK_DRIVER_PYTHON']= 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']='notebook'

# si da problema con collect quizás haya que poner java_home
os.environ['JAVA_HOME']= 'C:\\JDK\\jdk8-64bits'
os.environ['PATH'] = os.environ.get('JAVA_HOME')+'\\bin;'+path



In [2]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hola ''')
df.show()


+-----+
| hola|
+-----+
|spark|
+-----+



## 1: Acciones

Al importar pyspark ya tenemos una variable spark con la conexión con Spark creada. Para manejar RDDs nos interesará extraer de ella el llamado *sparkContext*:

In [4]:
sc = spark.sparkContext
r = sc.parallelize([1,2,3]) # Crea un RDD con los valores 1,2,3

Veamos el tipo de la variable r

In [5]:
type(r)

pyspark.rdd.RDD

In [6]:
cuadrados = sc.parallelize([i*i for i in range(100)])
l = cuadrados.take(5)
l[2:]    

[4, 9, 16]

### Acciones  collect, take y count

Las *acciones* son las que lanzan un cómputo.
** collect ** recupera todo un RDD como si se tratara de un array

*Ojo*: Si hacemos esto nos traemos toda la colección al ordenador en el que estamos, ¡eso no es big data!

In [7]:
r = sc.parallelize([i*i for i in range(100)])
r.collect()  # Falla en algunas versiones de Java!!!!!!

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400,
 441,
 484,
 529,
 576,
 625,
 676,
 729,
 784,
 841,
 900,
 961,
 1024,
 1089,
 1156,
 1225,
 1296,
 1369,
 1444,
 1521,
 1600,
 1681,
 1764,
 1849,
 1936,
 2025,
 2116,
 2209,
 2304,
 2401,
 2500,
 2601,
 2704,
 2809,
 2916,
 3025,
 3136,
 3249,
 3364,
 3481,
 3600,
 3721,
 3844,
 3969,
 4096,
 4225,
 4356,
 4489,
 4624,
 4761,
 4900,
 5041,
 5184,
 5329,
 5476,
 5625,
 5776,
 5929,
 6084,
 6241,
 6400,
 6561,
 6724,
 6889,
 7056,
 7225,
 7396,
 7569,
 7744,
 7921,
 8100,
 8281,
 8464,
 8649,
 8836,
 9025,
 9216,
 9409,
 9604,
 9801]

Es mucho más normal recuperar solo unos cuantos elementos. De esto se encarga *take(n)* que devuelve los n primeros miembros del rdd

In [8]:
print(r.take(10))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


Por último, *count* nos dice de cuántos elementos consta un RDD

In [9]:
texto = sc.textFile('C:/hlocal/tdm/quijote.txt')

In [10]:
print(texto.count())
print(texto.take(10))

37861
['The Project Gutenberg EBook of Don Quijote, by Miguel de Cervantes Saavedra', '', 'This eBook is for the use of anyone anywhere at no cost and with', 'almost no restrictions whatsoever.  You may copy it, give it away or', 're-use it under the terms of the Project Gutenberg License included', 'with this eBook or online at www.gutenberg.net', '', '', 'Title: Don Quijote', '']


### Reduce, fold

*reduce* es una de las operaciones claves en Spark. 
Reduce todos los elementos de un RDD a uno solo. 
Para ello toma una función que aplica:
    * Primero a los dos primeros elementos, dando un resultado r1
    * Luego a r1 y al tercer elemento, dando r2
    * ... así hasta rn-1 y el último elemento, que dan el valor final

In [12]:
def add(x,y):
    print(x,y)
    return x+y

r = sc.parallelize(range(1,6))
print(r.reduce(add))

1 2
3 3
6 4
10 5
15


In [13]:
# Otra forma de lograr lo mismo: funciones lambda
print(r.reduce(lambda x,y: x+y))

15


fold() es similar a reduce, excepto porque toma un 'valor cero' inicial, al que podemos llamar z. 
Dado un valor z y una lista l, la función se aplica
    * Primero a z y al primer elemento de l, dando un resultado r1
    * Luego a r1 y al segundo elemento de l, dando r2
    * ... así hasta rn-1 y el último elemento, que dan el valor final

In [None]:
print(r.fold(0,lambda x,y: x+y)) #raro!!!

## 2: Transformaciones

### Map

Aplica una función a todos los miembros de un RDD. La salida es otro RDD con tantos elementos como tenía la entrada

In [19]:
def longitud(s):
    return len(s)
longitudes = texto.map(longitud)
print(longitudes.count())
longitudes.take(5)

37861


[75, 0, 64, 68, 67]

In [33]:
def divide(s):
    return s.split(" ")
##################################
texto\
.flatMap(divide)\
.map(lambda s:s.upper())\
.map(lambda s : (s,len(s)))\
.take(20)


[('THE', 3),
 ('PROJECT', 7),
 ('GUTENBERG', 9),
 ('EBOOK', 5),
 ('OF', 2),
 ('DON', 3),
 ('QUIJOTE,', 8),
 ('BY', 2),
 ('MIGUEL', 6),
 ('DE', 2),
 ('CERVANTES', 9),
 ('SAAVEDRA', 8),
 ('', 0),
 ('THIS', 4),
 ('EBOOK', 5),
 ('IS', 2),
 ('FOR', 3),
 ('THE', 3),
 ('USE', 3),
 ('OF', 2)]

### Filter

Aplica una función booleana a todos los miembros de un RDD. La salida es otro RDD que solo tiene los miembros para los que la función devuelve True

In [32]:
numeros = sc.parallelize(range(50))
numeros.filter(lambda n: (n%2) == 0).collect()

[0,
 2,
 4,
 6,
 8,
 10,
 12,
 14,
 16,
 18,
 20,
 22,
 24,
 26,
 28,
 30,
 32,
 34,
 36,
 38,
 40,
 42,
 44,
 46,
 48]

### Ejemplo complejo

In [35]:
def maxpareja(s1,s2):
    if s1[1]>s2[1]:
        r = s1
    else:
        r = s2
    return r
pareja = texto.flatMap(lambda line: line.split(" ")) \
             .map(lambda s:s.upper())\
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)\
             .reduce(maxpareja)
        
print(pareja)

('QUE', 19470)


## Ejercicio

leer el quijote en un RDD y cuente el número de frases que contienen el texto 'merced'.
Nota: no importa que la palabra venga dentro de otra palabra, se contará igual
