# Instalamos e importamos librerías

In [1]:
#!pip install pyspark
#!pip install -U -q PyDrive
#!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [2]:
# from pydrive.auth import GoogleAuth
# from pydrive.drive import GoogleDrive
# from google.colab import auth
# from oauth2client.client import GoogleCredentials
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

# Autenticamos con Google Drive

In [None]:
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Bajamos archivo con la colección de Shakespeare

In [None]:
downloaded = drive.CreateFile({'id':"1ybtSQxrqVqbRrl_3FMzMYW03Flp4zM-j"})   # replace the id with id of file you want to access
downloaded.GetContentFile('s.txt') 

# Creamos el Spark Context

In [4]:
# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [9]:
type(sc)

pyspark.context.SparkContext

# Lectura de datos en Spark

## Paralelizando una coleccion de python

In [10]:
## creamos 1000 enteros en una lista
integersList = range(1,1001)
len(integersList)

1000

In [11]:
## Paralelizamos la coleccion utilizando 8 particiones o slices
## Esta operacion es una transformacion de datos en un RDD
## Dado que Spark usa lazy evaluation, no corren jobs de Spark
## hasta el momento
integersListRDD = sc.parallelize(integersList, 8)
type(integersListRDD)

pyspark.rdd.PipelinedRDD

In [12]:
## podemos ver tambien otra informacion interesante del RDD
## el numero de particiones
integersListRDD.getNumPartitions()

8

In [13]:
## el conjunto de transformaciones que se aplica
integersListRDD.toDebugString()

b'(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274 []'

In [None]:
## para ver mas metodos disponibles del RDD
help(integersListRDD)

In [15]:
integersListRDD.take(5)

[1, 2, 3, 4, 5]

In [16]:
integersListRDD.count()

1000

## Leyendo archivo con textFile

In [25]:
rdd = spark.sparkContext.textFile('shakespeare.txt')

In [26]:
rdd

shakespeare.txt MapPartitionsRDD[12] at textFile at NativeMethodAccessorImpl.java:0

In [27]:
rdd.count()

124614

In [29]:
rdd.take(5)

['1609', '', 'THE SONNETS', '', 'by William Shakespeare']

## Leyendo datos con el sqlContext

In [30]:
sqlContext = SQLContext(sc)

In [31]:
dataframe = sqlContext.read.text('shakespeare.txt')

In [32]:
dataframe

DataFrame[value: string]

In [33]:
rddCsv = dataframe.rdd

In [34]:
rddCsv

MapPartitionsRDD[20] at javaToPython at NativeMethodAccessorImpl.java:0

In [35]:
rddCsv.take(5)

[Row(value='1609'),
 Row(value=''),
 Row(value='THE SONNETS'),
 Row(value=''),
 Row(value='by William Shakespeare')]

Tambien se pueden leer archivos csv, json, parquet, jdbc, etc

# Acciones

## Count

Obtiene la cantidad de registros del RDD

In [37]:
integersListRDD.count()

1000

### Take

Obtiene los primeros n registros del RDD

In [42]:
integersListRDD.take(5)

[1, 2, 3, 4, 5]

## Collect

Obtiene TODOS los registros del RDD. Esto es un potencial problema, ya que si los datos no son acotados va a sobrecargar el driver. Solo se debe ejecutar si de antemano conocemos que la cantidad de datos es acotada.

In [None]:
integersListRDD.collect() # danger

## First

Obtiene el primer registro del RDD

In [43]:
integersListRDD.first()

1

## TakeOrdered

Obtiene los primeros n registros en base a un orden indicado.

In [44]:
integersListRDD.takeOrdered(5, key=lambda x: -x)

[1000, 999, 998, 997, 996]

## TakeSample

Obtiene una muestra de n registros con o sin reemplazo.

In [47]:
integersListRDD.takeSample(False, 5)

[838, 205, 744, 631, 561]

## Reduce

Obtiene un solo registro, combinando el resultado en base a una función dada.

Suma de todos los nros del RDD:

In [48]:
integersListRDD.reduce(lambda a,b: a+b)

500500

Número más grande del RDD:

In [49]:
integersListRDD.reduce(lambda a,b: a if a > b else b)

1000

## CountByKey

Cuenta ocurrencias de registros para cada clave.

En Spark para que un registro sea considerado con clave debe se una tupla de unicamente dos elementos. El primer elemento es la key y el segundo el valor. A su vez, la key y el valor pueden estar compuestos por tuplas.

Cuento cuántos nros múltiplo de 2 hay y cuántos no:

In [52]:
integersListRDD.map(lambda x: (x % 2, 'xd')).countByKey()

defaultdict(int, {1: 500, 0: 500})

# Transformaciones

### Map

Transforma cada registro en base a la función dada.

In [54]:
def por_dos(x):
    return x*2

In [58]:
integersListRDD.map(lambda x: x*2).take(5)

[2, 4, 6, 8, 10]

In [59]:
integersListRDD.map(lambda x: (x % 2, x)).take(5)

[(1, 1), (0, 2), (1, 3), (0, 4), (1, 5)]

## Filter

Filtra registros en base a la función dada.

In [60]:
integersListRDD.filter(lambda x: x % 2 == 0).take(5)

[2, 4, 6, 8, 10]

In [61]:
integersListRDD.filter(lambda x: x % 2 == 0).count()

500

## FlatMap

Similar a Map, pero cada registro puede generar 0, 1 o más registros.

Para cada registro original genero un nuevo registro con el nro, otro con el nro menos uno y otro con el registro más uno:

In [67]:
integersFlat = integersListRDD.flatMap(lambda x: [(x), (x-1), (x+1)])
integersFlat.count()

3000

## ReduceByKey

Combina los registros para una misma clave en base a una función de reduce.

La función de reduce debe ser **conmutativa** y **asociativa**.

Del RDD salida del flatMap cuento cuantos registros hay para cada nro:

In [70]:
integersFlat.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).count()

1002

In [71]:
integersFlat.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).take(10)

[(0, 1),
 (8, 3),
 (16, 3),
 (24, 3),
 (32, 3),
 (40, 3),
 (48, 3),
 (56, 3),
 (64, 3),
 (72, 3)]

In [78]:
integersFlat.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).reduce(lambda a,b: a if a > b else b)

(1001, 1)

## GroupByKey

Agrupa los registros para cada clave. Es similar a reduceByKey pero con 
groupByKey se obtiene todos los registros para cada clave.

Solo se debe utilizar si es necesario la información de cada registro y la cantidad de registros por clave no es demasiado grande.

GroupByKey es una transformación costosa.

Si se desea realizar una agregación, usar reduceByKey. Usar groupByKey para hacer una agregación esta MAL.

Necesito saber cuales son los nros múltiplos de 2 y cuales no:

In [None]:
integersListRDD.map(lambda x: (x % 2, x)).groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

## Distinct

Elimina registros duplicados (todo el registro debe coincidir)

Del RDD trás aplicar flatMap, obtengo los registros únicos:

In [85]:
#integersFlat.distinct().collect()
integersFlat.distinct().count()


1002

# Ejemplos de transformaciones y acciones con los textos de Shakespeare

## Leo de a líneas

In [7]:
lines = spark.sparkContext.textFile('shakespeare.txt')

## Cantidad de líneas totales

In [8]:
lines.count()

124614

## Primeras 10 líneas

In [9]:
lines.take(10)

['1609',
 '',
 'THE SONNETS',
 '',
 'by William Shakespeare',
 '',
 '',
 '',
 '                     1',
 '  From fairest creatures we desire increase,']

## Obtengo las palabras de todas las líneas (flatMap)

In [11]:
words = lines.flatMap(lambda x: x.split())

In [12]:
words.take(10)

['1609',
 'THE',
 'SONNETS',
 'by',
 'William',
 'Shakespeare',
 '1',
 'From',
 'fairest',
 'creatures']

In [13]:
words.count()

902892

## Contando palabras (reduceByKey)

In [14]:
wordsCount = words.map(lambda x: (x.lower(),1))

In [15]:
wordsCount.take(10)

[('1609', 1),
 ('the', 1),
 ('sonnets', 1),
 ('by', 1),
 ('william', 1),
 ('shakespeare', 1),
 ('1', 1),
 ('from', 1),
 ('fairest', 1),
 ('creatures', 1)]

In [16]:
wordsCounted = wordsCount.reduceByKey(lambda x,y: x+y)

In [17]:
wordsCounted.take(10)

[('shakespeare', 258),
 ('1', 13),
 ('fairest', 39),
 ('creatures', 27),
 ('we', 3210),
 ('increase,', 9),
 ('thereby', 21),
 ("beauty's", 30),
 ('rose', 44),
 ('never', 959)]

In [18]:
wordsCounted.takeOrdered(10, lambda x: -x[1])

[('the', 27681),
 ('and', 26066),
 ('i', 19540),
 ('to', 18737),
 ('of', 18084),
 ('a', 14424),
 ('my', 12456),
 ('in', 10721),
 ('you', 10666),
 ('that', 10489)]

### Mal uso de groupByKey

In [19]:
wordsCount.groupByKey().takeOrdered(10, lambda x: -1 * len(x[1]))

[('the', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82a90>),
 ('and', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82af0>),
 ('i', <pyspark.resultiterable.ResultIterable at 0x7ffb18b824f0>),
 ('to', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82b50>),
 ('of', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82610>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82bb0>),
 ('my', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82c10>),
 ('in', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82190>),
 ('you', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82c70>),
 ('that', <pyspark.resultiterable.ResultIterable at 0x7ffb18b82cd0>)]

In [None]:
wordsCount.groupByKey().map(lambda a: (a[0], list(a[1]))).take(5)

In [21]:
wordsCount.groupByKey().filter(lambda x: len(x[1]) < 5).map(lambda a: (a[0], list(a[1]))).take(5)

[('riper', [1, 1, 1]),
 ('memory:', [1]),
 ("feed'st", [1, 1, 1]),
 ("light's", [1]),
 ('fuel,', [1])]

## Palabra más larga (reduce)

In [31]:
words.reduce(lambda a, b: a if (len(a) > len(b)) else b)

'http://www.ibiblio.org/gutenberg/etext06'

## Palabras que empiezan con a (filter)

In [32]:
wordsA = words.filter(lambda word: word.startswith('a'))

In [34]:
wordsA.count()

63676

In [35]:
wordsA.take(10)

['as', 'a', 'abundance', 'art', 'and', 'a', 'asked,', 'all', 'all', 'an']

## Palabras únicas que empiezan con a (distinct)

In [38]:
wordsA.distinct().count()

2688

## Cantidad de palabras por frecuencia de repetición ordenados (sortByKey)

In [39]:
wordsCounted.take(5)

[('shakespeare', 258),
 ('1', 13),
 ('fairest', 39),
 ('creatures', 27),
 ('we', 3210)]

In [40]:
wordsFreq = wordsCounted.map(lambda x: (x[1],1))

In [42]:
wordsFreq.take(10)

[(258, 1),
 (13, 1),
 (39, 1),
 (27, 1),
 (3210, 1),
 (9, 1),
 (21, 1),
 (30, 1),
 (44, 1),
 (959, 1)]

In [43]:
wordsFreq.reduceByKey(lambda a,b: a+b).sortByKey().take(10)

[(1, 31072),
 (2, 8493),
 (3, 4342),
 (4, 2659),
 (5, 1822),
 (6, 1338),
 (7, 1053),
 (8, 779),
 (9, 700),
 (10, 549)]

In [46]:
wordsFreq.reduceByKey(lambda a,b: a+b).takeOrdered(10, lambda x: -x[1])

[(1, 31072),
 (2, 8493),
 (3, 4342),
 (4, 2659),
 (5, 1822),
 (6, 1338),
 (7, 1053),
 (8, 779),
 (9, 700),
 (10, 549)]

# Transformaciones entre dos RDD

## Union

Obtiene la unión entre dos RDD.

In [None]:
integersList2 = range(501,1501)
len(integersList2)

In [None]:
integersList2RDD = sc.parallelize(integersList2)

In [None]:
integersList2RDD.count()

In [None]:
integersListRDD.count()

In [None]:
union = integersListRDD.union(integersList2RDD)

In [None]:
union.take(5)

In [None]:
union.count()

## Intersection

Intersección entre dos RDD.

In [None]:
intersection = integersListRDD.intersection(integersList2RDD)

In [None]:
intersection.count()

In [None]:
intersection.take(10)

In [None]:
intersection.collect()

## Subtract

Elimina del primer RDD los registros que aparezcan en el segundo.

In [None]:
subtract = integersListRDD.subtract(integersList2RDD)

In [None]:
subtract.count()

In [None]:
subtract.collect()

## Joins

Con los joins se combinan dos RDD en base a las claves de los registros. Junta cada registro del primer RDD con cada registro del segundo RDD que tengan la misma clave. No agrupa, sino que es de a pares de registro.

In [None]:
data_alumnos = [
  (1,'Damian'),
  (2,'Luis'),
  (3,'Martin'),
  (4,'Natalia'),
  (5,'Joaquin')
]

alumnos = sc.parallelize(data_alumnos)

In [None]:
alumnos.collect()

In [None]:
data_materias_aprobadas = [
  (1, 'Algebra'),
  (2, 'Análisis Matemático'),
  (200, 'Algebra'),
  (2, 'Física')
]

materias_aprobadas = sc.parallelize(data_materias_aprobadas)

In [None]:
materias_aprobadas.collect()

### Inner Join (Join)

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) con todos los pares de elementos para cada key. (especificamente los que hay en comun por esa clave en ambos sets de datos)

In [None]:
alumnos.join(materias_aprobadas).collect()

### Left Outer Join

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos del set de datos izquierdo estaran en el resultado del join.

In [None]:
alumnos.leftOuterJoin(materias_aprobadas).collect()

### Right Outer Join

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos del set de datos derecho estaran en el resultado del join.

In [None]:
alumnos.rightOuterJoin(materias_aprobadas).collect()

### Outer/Full Join


Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos de ambos set de datos estaran aunque no haya match de keys.

In [None]:
alumnos.fullOuterJoin(materias_aprobadas).collect()

### Broadcast Join (map-side join)

#### Variable Broadcast

Una variable Broadcast nos permite mantener una variable solo lectura cacheada en cada una de las maquinas del cluster en vez de enviar esa informacion con cada una de las tareas que se envian al cluster.

Esto es particularmente util cuando cuando tareas a partir de multiples etapas (stages) necesitan la misma información o cuando cachear información de forma deserializada es importante.

Tener en cuenta que esto **es posible** cuando uno de los data sets o conjunto de datos **es lo suficientemente pequeño para ser broadcasteado a todos los nodos/workers del cluster**.

In [47]:
# Vamos a suponer que tenemos un RDD de productos por sus IDs identificando ventas de los mismos
prodsList = [1,11,1,4,5,11,2,3,4,5,6,4,5,4,3,2,1,11,2,3,4,5,6,4,3,2,1,1]
prods = sc.parallelize(prodsList,3)

In [48]:
# Un hash con los productos y sus nombres
productNames = {1:'papas',
                2:'cebollas',
                3:'tomates',
                4:'zanahorias',
                5:'batatas',
                6:'peras',
                7:'cilantro',
                8:'apio',
                9:'morrones',
                10:'manzanas',
                11:'naranjas'}

# Hacemos un broadcast de la variable
bproductNames = sc.broadcast(productNames)

In [49]:
# Buscamos los productos que se vendieron más de 4 veces
popularProds = prods.map(lambda x:(x,1))\
    .reduceByKey(lambda x,y:x+y)\
    .filter(lambda x:x[1]>=4)
popularProds.collect()

[(3, 4), (1, 5), (4, 6), (5, 4), (2, 4)]

El join se realiza de forma implicita usando un map y dentro del mismo accediendo a la informacion de la variable a la que se realizo el broadcast via .value

In [50]:
popularProds = popularProds.map(
    lambda x:(bproductNames.value[x[0]],x[0],x[1]))
popularProds.collect()

[('tomates', 3, 4),
 ('papas', 1, 5),
 ('zanahorias', 4, 6),
 ('batatas', 5, 4),
 ('cebollas', 2, 4)]

#### Ventajas

Cuando un valor es "broadcasteado" al cluster, este es copiado a los nodos/workers **sólo una vez** (en vez de múltiples veces si la información fuera a enviarse en cada task). De esta forma se resuelve la consulta más rapidamente.

# Transformaciones sobre las particiones

In [None]:
rdd = sc.parallelize(range(1,11))
rdd.getNumPartitions()

In [None]:
sc.defaultParallelism

In [None]:
rdd.collect()

## Glom

Junta los registros de cada partición en una lista.

In [None]:
rdd.glom().collect()

## MapPartitions

Devuelve un nuevo RDD aplicando una función a cada partición del RDD.

In [None]:
def f(iterator): yield __builtin__.sum(iterator)
rdd.mapPartitions(f).collect()

## Repartition

Reshuffle los datos en el RDD de forma aleatoria para crear más o menos particiones y balancearlas. 

Hace un shuffle de todo los datos por la red.

In [None]:
rdd = sc.parallelize(range(1,11), 4)
rdd.getNumPartitions()

In [None]:
rdd.glom().collect()

In [None]:
rdd2 = rdd.repartition(2)

In [None]:
rdd2.getNumPartitions()

In [None]:
rdd2.glom().collect()

Spark no hace shuffle de registros individuales sino de a bloques con un mínimo (no es un problema cuando se manejan grandes cantidades de datos)

## Coalesce

Decrementa la cantidad de particiones del RDD.

No hace shuffle por defecto, solo pasa datos de una partición a otra.

No quedan balanceadas.

In [None]:
rddCoalesce = rdd.coalesce(2)

In [None]:
rddCoalesce.glom().collect()

## RepartitionAndSortWithinPartitions

Reparticiona un RDD de acuerdo a un particionador y ordena los registros en base a su clave.

Los registros deben tener clave.

Es más eficiente que hacer un repartition y luego un sort dentro de cada partición ya que realiza el sort en el mismo paso de shuffle.

In [None]:
rdd.map(lambda x: (x, x)).collect()

In [None]:
rdd.map(lambda x: (x, x)).glom().collect()

### Ascending

In [None]:
rdd.map(lambda x: (x, x)).repartitionAndSortWithinPartitions(2).glom().collect()

In [None]:
rdd.map(lambda x: (x % 3, x)).repartitionAndSortWithinPartitions(2).glom().collect()

In [None]:
rdd.map(lambda x: (x % 3, x)).repartitionAndSortWithinPartitions(2, ascending=False).glom().collect()

### PartitionFunc

In [None]:
rdd.map(lambda x: (x * 2, x)).repartitionAndSortWithinPartitions(2).glom().collect()

In [None]:
rdd.map(lambda x: (x * 2, x)).repartitionAndSortWithinPartitions(2, partitionFunc=lambda x: (x % 3)).glom().collect()

# Persistiendo RDD

## Cache

Cachea un RDD intermedio que va a ser utilizado varias veces de modo de evitar tener que ejecutar todas las transformaciones cada vez.

In [None]:
rdd = sc.parallelize(range(1,100000))

In [None]:
rddCached = rdd.map(lambda x: x*10).cache()

In [None]:
rddCached.count()

In [None]:
rddCached.take(10)

## SaveAsTextFile

Guarda un RDD a disco en un archivo de texto.

In [None]:
rdd.saveAsTextFile('numbers.txt')

In [None]:
rddN = sc.textFile('numbers.txt')

In [None]:
rddN.collect()

## SaveAsPickleFile

Guarda un RDD a disco en un archivo con los datos serializados.

In [None]:
rdd.saveAsPickleFile('numbers2.file')

In [None]:
rddN2 = sc.pickleFile('numbers2.file')

In [None]:
rddN2.take(10)