<a href="https://colab.research.google.com/github/FedericoCalonge/OrganizacionDatosRepoMateria/blob/master/SPARK/Notebook/SPARK_UBA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1-Inicio SPARK y Lectura de datos.

Vamos a ver cómo leer datos de un archivo con Spark y como hacerlo mediante transformaciones y acciones. 


## Instalamos e importamos librerías

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u252-b09-1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-440
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

## Autenticamos con Google Drive

In [None]:
#Esto lo hacemos para poder bajar archivos de Google Drive.
#Tenemos que entrar al link y colocar el código de verificación.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
#Esto es por si queremos montar nuestra Carpeta de Google al directorio de Files que vemos a la izquierda en el ícono "Archivos/Files" 
#(para así poder guardar o cargar archivos desde ahí).
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!ls "/content/drive/My Drive"

'Backups Obligatorios Sincronizados'
'Colab Notebooks'
 ContinuacionDjangoCurso4.rar
'CSVs TP1-OrgaDeDatos'
'Documento sin título.gdoc'
'Grupo Tecnico IA'
'HTML - CSS - BOOTSTRAP - PYTHONDJANGO.rar'
 s.txt
'TL4 Ejemplos'
'TP1 - Organizacion de Datos - Undav_Team'


## Bajamos archivo con la colección de Shakespeare

In [None]:
#Acá bajamos el archivo s.txt que son textos de Sheckspeare que vamos a usar más adelante para algunos ejemplos.
downloaded = drive.CreateFile({'id':"1ybtSQxrqVqbRrl_3FMzMYW03Flp4zM-j"})   # Este es el ID del archivo que queremos tener acceso (que lo tiene el profe en su Drive). DESPUES VER COMO ACCEDER A ESE EN EL NAVEGADOR.
downloaded.GetContentFile("/content/drive/My Drive/Colab Notebooks/UBAs.txt")   #Este txt lo vemos a la izquierda yendo al ícono "Archivos/Files" y lo vemos en esa ruta que lo coloqué.

## Creamos el Spark Context

In [None]:
#Tenemos que crear el "Spark Context" para poder trabajar con Spark. 

# 1ro creamos una sesión de Spark:
spark = SparkSession.builder.getOrCreate()

# Y luego de esta variable spark (nuestra sesión) obtenemos el contexto de Spark:
sc = spark.sparkContext

#Y esta variable, sc, es la que vamos a utilizar para comunicarnos con Spark.

In [None]:
type(sc)

pyspark.context.SparkContext

## Lectura de datos en Spark

Vamos a ver cómo leer datos en Spark. 
Vamos a ver 3 formas de hacerlo.

### Forma 1 - Paralelizando una coleccion de python

In [None]:
#La idea de esta manera es tener una colección de datos en Python y paralelizarlos en un RDD con Spark.

integersList = range(1,1001) ## Creamos una lista de entero del 1 al 1000 (no incluye al 1001).
len(integersList)

1000

In [None]:
#Ahora para transformar esta variable de Python en un RDD lo que hacemos es llamar al método
#parallelize (que es de sc, osea del Spark Context). Si ponemos el puntero del mouse sobre la función parallelize
# nos tira la info de ese método. Nos dice que parallelize permite distribuir una colección de Python local
#en un RDD, y el 2do parametro (slice, en el que pusimos 8) es la cantidad de particiones en que queremos que divida 
#Spark ese RDD.

#De esta manera paralelizamos la coleccion utilizando 8 particiones o slices
#Esta operacion es una transformacion de datos en un RDD. Y dado que Spark usa lazy evaluation, no corren jobs de Spark hasta el momento.

integersListRDD = sc.parallelize(integersList, 8) #Entonces le pasamos la lista de enteros y le decimos que lo divida en 8 particiones. 
type(integersListRDD) #Entonces ahora en integersListRDD tenemos un RDD de Spark que podemos utilizarlo para luego procesarlo.

pyspark.rdd.PipelinedRDD

In [None]:
integersListRDD.getNumPartitions() #Para comprobar efectivamente que el RDD está dividido en 8 particiones (que fue lo que le indicamos cuando lo creamos)

8

In [None]:
integersListRDD.toDebugString() #Método que nos permite llamar a un RDD que tenga transformaciones 
#para ver dichas transformaciones que se aplican en el RDD. Como en nuestro caso es un RDD que recién creamos solo
#tenemos la operación de paralelización ('ParallelCollectionRDD'). 

b'(8) PythonRDD[1] at RDD at PythonRDD.scala:53 []\n |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262 []'

In [None]:
help(integersListRDD) #Para ver más métodos disponibles para utilizar en estructuras RDDs.

Help on PipelinedRDD in module pyspark.rdd object:

class PipelinedRDD(RDD)
 |  Pipelined maps:
 |  
 |  >>> rdd = sc.parallelize([1, 2, 3, 4])
 |  >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
 |  [4, 8, 12, 16]
 |  >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
 |  [4, 8, 12, 16]
 |  
 |  Pipelined reduces:
 |  >>> from operator import add
 |  >>> rdd.map(lambda x: 2 * x).reduce(add)
 |  20
 |  >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
 |  20
 |  
 |  Method resolution order:
 |      PipelinedRDD
 |      RDD
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, prev, func, preservesPartitioning=False, isFromBarrier=False)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  getNumPartitions(self)
 |      Returns the number of partitions in RDD
 |      
 |      >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
 |      >>> rdd.getNumPartitions()
 |      2
 |  
 |  id(self)
 |      A unique ID for this RD

In [None]:
integersListRDD.take(5) #Llamamos a una ACCIÓN de Spark (take) que lo que hace es obtener los
#primeros 5 registros del RDD. En nuestro caso los números del 1 al 5. 

[1, 2, 3, 4, 5]

In [None]:
integersListRDD.count() #Ahora ejecutamos otra ACCIÓN (count) que te devuelve la cantidad de registros del RDD.
#Como recordamos, integersListRDD lo creamos a partir de las variables en Python.

1000

### Forma 2 - Leyendo archivo con textFile

In [None]:
#De esta manera vamos a leer el archivo 's.txt' con la función 'textFile':
rdd = spark.sparkContext.textFile('s.txt')  #Acá solo ponemos 's.txt' porque es un archivo local del google colab, pero ahi podemos poner el path de nuestro archivo 
                                            #o tambien este archivo lo podemos obtener de un HDFs o de un Amazon F3, etc.

In [None]:
#Este textFile nos devuelve directamente un RDD. 
rdd

s.txt MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

In [None]:
rdd.count()

124614

In [None]:
rdd.take(5)

['1609', '', 'THE SONNETS', '', 'by William Shakespeare']

### Forma 3 - Leyendo datos con el sqlContext

In [None]:
sqlContext = SQLContext(sc)

In [None]:
dataframe = sqlContext.read.text('s.txt')

In [None]:
dataframe

In [None]:
rddCsv = dataframe.rdd

In [None]:
rddCsv

In [None]:
rddCsv.take(5)

Tambien se pueden leer archivos csv, json, parquet, jdbc, etc

# 2-Acciones.

## Count

Obtiene la cantidad de registros del RDD

In [None]:
integersListRDD.count()

### Take

Obtiene los primeros n registros del RDD

In [None]:
integersListRDD.take(5)

## Collect

Obtiene TODOS los registros del RDD. Esto es un potencial problema, ya que si los datos no son acotados va a sobrecargar el driver. Solo se debe ejecutar si de antemano conocemos que la cantidad de datos es acotada.

In [None]:
integersListRDD.collect() # danger

## First

Obtiene el primer registro del RDD

In [None]:
integersListRDD.first()

## TakeOrdered

Obtiene los primeros n registros en base a un orden indicado.

In [None]:
integersListRDD.takeOrdered(5, key=lambda x: -x)

## TakeSample

Obtiene una muestra de n registros con o sin reemplazo.

In [None]:
integersListRDD.takeSample(False, 5)

## Reduce

Obtiene un solo registro, combinando el resultado en base a una función dada.

Suma de todos los nros del RDD:

In [None]:
integersListRDD.reduce(lambda a,b: a+b)

Número más grande del RDD:

In [None]:
integersListRDD.reduce(lambda a,b: a if a > b else b)

## CountByKey

Cuenta ocurrencias de registros para cada clave.

En Spark para que un registro sea considerado con clave debe se una tupla de unicamente dos elementos. El primer elemento es la key y el segundo el valor. A su vez, la key y el valor pueden estar compuestos por tuplas.

Cuento cuántos nros múltiplo de 2 hay y cuántos no:

In [None]:
integersListRDD.map(lambda x: (x % 2, 1)).countByKey()

# 3-Transformaciones.

### Map

Transforma cada registro en base a la función dada.

In [None]:
integersListRDD.map(lambda x: x*2).take(5)

In [None]:
integersListRDD.map(lambda x: (x % 2, x)).take(5)

## Filter

Filtra registros en base a la función dada.

In [None]:
integersListRDD.filter(lambda x: x % 2 == 0).take(5)

In [None]:
integersListRDD.filter(lambda x: x % 2 == 0).count()

## FlatMap

Similar a Map, pero cada registro puede generar 0, 1 o más registros.

Para cada registro original genero un nuevo registro con el nro, otro con el nro menos uno y otro con el registro más uno:

In [None]:
integersFlat = integersListRDD.flatMap(lambda x: [(x), (x-1), (x+1)])
integersFlat.count()

## ReduceByKey

Combina los registros para una misma clave en base a una función de reduce.

La función de reduce debe ser **conmutativa** y **asociativa**.

Del RDD salida del flatMap cuento cuantos registros hay para cada nro:

In [None]:
integersFlat.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).count()

In [None]:
integersFlat.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).take(10)

In [None]:
integersFlat.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b).reduce(lambda a,b: a if a > b else b)

## GroupByKey

Agrupa los registros para cada clave. Es similar a reduceByKey pero con 
groupByKey se obtiene todos los registros para cada clave.

Solo se debe utilizar si es necesario la información de cada registro y la cantidad de registros por clave no es demasiado grande.

GroupByKey es una transformación costosa.

Si se desea realizar una agregación, usar reduceByKey. Usar groupByKey para hacer una agregación esta MAL.

Necesito saber cuales son los nros múltiplos de 2 y cuales no:

In [None]:
integersListRDD.map(lambda x: (x % 2, x)).groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

## Distinct

Elimina registros duplicados (todo el registro debe coincidir)

Del RDD trás aplicar flatMap, obtengo los registros únicos:

In [None]:
integersFlat.distinct().count()


# 4- Más ejemplos de transformaciones y acciones con los textos de Shakespeare (con uso de wordsCount).

## Leo de a líneas

In [None]:
lines = spark.sparkContext.textFile('s.txt')

## Cantidad de líneas totales

In [None]:
lines.count()

## Primeras 10 líneas

In [None]:
lines.take(10)

## Obtengo las palabras de todas las líneas (flatMap)

In [None]:
words = lines.flatMap(lambda x: x.split())

In [None]:
words.take(10)

In [None]:
words.count()

## Contando palabras (reduceByKey)

In [None]:
wordsCount = words.map(lambda x: (x.lower(),1))

In [None]:
wordsCount.take(10)

In [None]:
wordsCounted = wordsCount.reduceByKey(lambda x,y: x+y)

In [None]:
wordsCounted.take(10)

In [None]:
wordsCounted.takeOrdered(10, lambda x: -x[1])

### Mal uso de groupByKey

In [None]:
wordsCount.groupByKey().takeOrdered(10, lambda x: -1 * len(x[1]))

In [None]:
wordsCount.groupByKey().map(lambda a: (a[0], list(a[1]))).take(5)

In [None]:
wordsCount.groupByKey().filter(lambda x: len(x[1]) < 5).map(lambda a: (a[0], list(a[1]))).take(5)

## Palabra más larga (reduce)

In [None]:
words.reduce(lambda a, b: a if (len(a) > len(b)) else b)

## Palabras que empiezan con a (filter)

In [None]:
wordsA = words.filter(lambda word: word.startswith('a'))

In [None]:
wordsA.count()

In [None]:
wordsA.take(10)

## Palabras únicas que empiezan con a (distinct)

In [None]:
wordsA.distinct().count()

## Cantidad de palabras por frecuencia de repetición ordenados (sortByKey)

In [None]:
wordsCounted.take(5)

In [None]:
wordsFreq = wordsCounted.map(lambda x: (x[1],1))

In [None]:
wordsFreq.take(10)

In [None]:
wordsFreq.reduceByKey(lambda a,b: a+b).sortByKey().take(10)

In [None]:
wordsFreq.reduceByKey(lambda a,b: a+b).takeOrdered(10, lambda x: -x[1])

# 5-Transformaciones entre dos RDD (con uso de Joins).


## Union

Obtiene la unión entre dos RDD.

In [None]:
integersList2 = range(501,1501)
len(integersList2)

In [None]:
integersList2RDD = sc.parallelize(integersList2)

In [None]:
integersList2RDD.count()

In [None]:
integersListRDD.count()

In [None]:
union = integersListRDD.union(integersList2RDD)

In [None]:
union.take(5)

In [None]:
union.count()

## Intersection

Intersección entre dos RDD.

In [None]:
intersection = integersListRDD.intersection(integersList2RDD)

In [None]:
intersection.count()

In [None]:
intersection.take(10)

In [None]:
intersection.collect()

## Subtract

Elimina del primer RDD los registros que aparezcan en el segundo.

In [None]:
subtract = integersListRDD.subtract(integersList2RDD)

In [None]:
subtract.count()

In [None]:
subtract.collect()

## Joins

Con los joins se combinan dos RDD en base a las claves de los registros. Junta cada registro del primer RDD con cada registro del segundo RDD que tengan la misma clave. No agrupa, sino que es de a pares de registro.

In [None]:
data_alumnos = [
  (1,'Damian'),
  (2,'Luis'),
  (3,'Martin'),
  (4,'Natalia'),
  (5,'Joaquin')
]

alumnos = sc.parallelize(data_alumnos)

In [None]:
alumnos.collect()

In [None]:
data_materias_aprobadas = [
  (1, 'Algebra'),
  (2, 'Análisis Matemático'),
  (200, 'Algebra'),
  (2, 'Física')
]

materias_aprobadas = sc.parallelize(data_materias_aprobadas)

In [None]:
materias_aprobadas.collect()

### Inner Join (Join)

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) con todos los pares de elementos para cada key. (especificamente los que hay en comun por esa clave en ambos sets de datos)

In [None]:
alumnos.join(materias_aprobadas).collect()

### Left Outer Join

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos del set de datos izquierdo estaran en el resultado del join.

In [None]:
alumnos.leftOuterJoin(materias_aprobadas).collect()

### Right Outer Join

Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos del set de datos derecho estaran en el resultado del join.

In [None]:
alumnos.rightOuterJoin(materias_aprobadas).collect()

### Outer/Full Join


Cuando se llama para sets de datos del tipo (K,V) y (K,W) devuelve un set de datos del tipo (K, (V,W)) asegurandonos que todos los datos de ambos set de datos estaran aunque no haya match de keys.

In [None]:
alumnos.fullOuterJoin(materias_aprobadas).collect()

### Broadcast Join (map-side join)

#### Variable Broadcast

Una variable Broadcast nos permite mantener una variable solo lectura cacheada en cada una de las maquinas del cluster en vez de enviar esa informacion con cada una de las tareas que se envian al cluster.

Esto es particularmente util cuando cuando tareas a partir de multiples etapas (stages) necesitan la misma información o cuando cachear información de forma deserializada es importante.

Tener en cuenta que esto **es posible** cuando uno de los data sets o conjunto de datos **es lo suficientemente pequeño para ser broadcasteado a todos los nodos/workers del cluster**.

In [None]:
# Vamos a suponer que tenemos un RDD de productos por sus IDs identificando ventas de los mismos
prodsList = [1,11,1,4,5,11,2,3,4,5,6,4,5,4,3,2,1,11,2,3,4,5,6,4,3,2,1,1]
prods = sc.parallelize(prodsList,3)

In [None]:
# Un hash con los productos y sus nombres
productNames = {1:'papas',
                2:'cebollas',
                3:'tomates',
                4:'zanahorias',
                5:'batatas',
                6:'peras',
                7:'cilantro',
                8:'apio',
                9:'morrones',
                10:'manzanas',
                11:'naranjas'}

# Hacemos un broadcast de la variable
bproductNames = sc.broadcast(productNames)

In [None]:
# Buscamos los productos que se vendieron más de 4 veces
popularProds = prods.map(lambda x:(x,1))\
    .reduceByKey(lambda x,y:x+y)\
    .filter(lambda x:x[1]>=4)
popularProds.collect()

El join se realiza de forma implicita usando un map y dentro del mismo accediendo a la informacion de la variable a la que se realizo el broadcast via .value

In [None]:
popularProds = popularProds.map(
    lambda x:(bproductNames.value[x[0]],x[0],x[1]))
popularProds.collect()

#### Ventajas

Cuando un valor es "broadcasteado" al cluster, este es copiado a los nodos/workers **sólo una vez** (en vez de múltiples veces si la información fuera a enviarse en cada task). De esta forma se resuelve la consulta más rapidamente.

# 6-Transformaciones sobre las particiones

In [None]:
rdd = sc.parallelize(range(1,11))
rdd.getNumPartitions()

In [None]:
sc.defaultParallelism

In [None]:
rdd.collect()

## Glom

Junta los registros de cada partición en una lista.

In [None]:
rdd.glom().collect()

## MapPartitions

Devuelve un nuevo RDD aplicando una función a cada partición del RDD.

In [None]:
def f(iterator): yield __builtin__.sum(iterator)
rdd.mapPartitions(f).collect()

## Repartition

Reshuffle los datos en el RDD de forma aleatoria para crear más o menos particiones y balancearlas. 

Hace un shuffle de todo los datos por la red.

In [None]:
rdd = sc.parallelize(range(1,11), 4)
rdd.getNumPartitions()

In [None]:
rdd.glom().collect()

In [None]:
rdd2 = rdd.repartition(2)

In [None]:
rdd2.getNumPartitions()

In [None]:
rdd2.glom().collect()

Spark no hace shuffle de registros individuales sino de a bloques con un mínimo (no es un problema cuando se manejan grandes cantidades de datos)

## Coalesce

Decrementa la cantidad de particiones del RDD.

No hace shuffle por defecto, solo pasa datos de una partición a otra.

No quedan balanceadas.

In [None]:
rddCoalesce = rdd.coalesce(2)

In [None]:
rddCoalesce.glom().collect()

## RepartitionAndSortWithinPartitions

Reparticiona un RDD de acuerdo a un particionador y ordena los registros en base a su clave.

Los registros deben tener clave.

Es más eficiente que hacer un repartition y luego un sort dentro de cada partición ya que realiza el sort en el mismo paso de shuffle.

In [None]:
rdd.map(lambda x: (x, x)).collect()

In [None]:
rdd.map(lambda x: (x, x)).glom().collect()

### Ascending

In [None]:
rdd.map(lambda x: (x, x)).repartitionAndSortWithinPartitions(2).glom().collect()

In [None]:
rdd.map(lambda x: (x % 3, x)).repartitionAndSortWithinPartitions(2).glom().collect()

In [None]:
rdd.map(lambda x: (x % 3, x)).repartitionAndSortWithinPartitions(2, ascending=False).glom().collect()

### PartitionFunc

In [None]:
rdd.map(lambda x: (x * 2, x)).repartitionAndSortWithinPartitions(2).glom().collect()

In [None]:
rdd.map(lambda x: (x * 2, x)).repartitionAndSortWithinPartitions(2, partitionFunc=lambda x: (x % 3)).glom().collect()

# 7-Persistiendo RDD (Persistencia y CACHE).

## Cache

Cachea un RDD intermedio que va a ser utilizado varias veces de modo de evitar tener que ejecutar todas las transformaciones cada vez.

In [None]:
rdd = sc.parallelize(range(1,100000))

In [None]:
rddCached = rdd.map(lambda x: x*10).cache()

In [None]:
rddCached.count()

In [None]:
rddCached.take(10)

## SaveAsTextFile

Guarda un RDD a disco en un archivo de texto.

In [None]:
rdd.saveAsTextFile('numbers.txt')

In [None]:
rddN = sc.textFile('numbers.txt')

In [None]:
rddN.collect()

## SaveAsPickleFile

Guarda un RDD a disco en un archivo con los datos serializados.

In [None]:
rdd.saveAsPickleFile('numbers2.file')

In [None]:
rddN2 = sc.pickleFile('numbers2.file')

In [None]:
rddN2.take(10)