# Variables broadcast

Las variables broadcast son variables de sólo lectura que se mandan una sola vez a cada nodo en lugar de ser enviadas con cada una de las tareas.

## ¿Cómo implementamos variables broadcast?
### Cómo se define la variable broadcast

Para definir una variable broadcast necesitamos llamarla a través del contexto de Spark

In [1]:
val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar = Broadcast(0)


Broadcast(0)

### Cómo consultar la variable broadcast
Para poder consultar su valor utilizamos el método value

In [2]:
broadcastVar.value

Array(1, 2, 3)

### Cómo se destruye una variable broadcast
Antes de destruir una variable broadcast se sugiere qitar su persistencia

In [4]:
broadcastVar.unpersist

Y ya después eliminarla

In [8]:
broadcastVar.destroy

## Escenario hipotético 1
Supongamos que queremos contar los elementos gramaticales de un párrafo y tenemos un Mapa de cada palabra con su categoría gramatical.

Para ello, supongamos que tenemos un diccionario como este (si nos va bien):


### Sin broadcast

In [9]:
 val dictionary = Map(("man"-> "noun"), ("is"->"verb"),("mortal"->"adjective"))

dictionary = Map(man -> noun, is -> verb, mortal -> adjective)


Map(man -> noun, is -> verb, mortal -> adjective)

Supongamos también que tenemos una función que cuenta cuántas veces encuentra estos elementos gramaticales en nuestro párrafo 


In [10]:
 def getElementsCount(word :String, dictionary:Map[String,String]):(String,Int) = {
dictionary.filter{ case (wording,wordType) => wording.equals((word))}.map(x => (x._2,1)).headOption.getOrElse(("unknown" -> 1)) //some dummy logic
}

getElementsCount: (word: String, dictionary: Map[String,String])(String, Int)


Y usamos esa función para contar cada elemento gramatical encontrado con un set de datos definido:

In [13]:
val words = sc.parallelize(Array("man","is","mortal","mortal","1234","789","456","is","man"))
val grammarElementCounts = words.map( word => getElementsCount(word,dictionary)).reduceByKey((x,y) => x+y)

words = ParallelCollectionRDD[0] at parallelize at <console>:31
grammarElementCounts = ShuffledRDD[2] at reduceByKey at <console>:32


ShuffledRDD[2] at reduceByKey at <console>:32

¿Hizo lo que queríamos?

In [14]:
grammarElementCounts.collect

Array((noun,2), (unknown,3), (adjective,2), (verb,2))

Sí, pero, aunque a nivel local está bien, puede ser problemático cuando lo queramos implementar en el clúster porque se requeriría ese diccionario en cada tarea en todos y cada uno de los ejecutores (tal vez un diccionariod e este tamaño no importa mucho, pero qué va  apasar cuando tengamos un diccionario de miles o decenas o centenas de miles de elementos).

### Con broadcast
Tengo el mismo diccionario, sólo que tengo que convertirlo en una variable broadcast.

In [15]:
val broadCastDictionary = sc.broadcast(dictionary)

broadCastDictionary = Broadcast(3)


Broadcast(3)

Cambiamos un poco nuestra función getElementsCount

In [17]:
def getElementsCount(word :String, dictionary:org.apache.spark.broadcast.Broadcast[Map[String,String]]):(String,Int) = {
dictionary.value.filter{ case (wording,wordType) => wording.equals((word))}.map(x => (x._2,1)).headOption.getOrElse(("unknown" -> 1))
}

getElementsCount: (word: String, dictionary: org.apache.spark.broadcast.Broadcast[Map[String,String]])(String, Int)


En lugar del diccionario crudo pasamos el diccionario broadbast

In [18]:
val words = sc.parallelize(Array("man","is","mortal","mortal","1234","789","456","is","man"))
val grammarElementCounts = words.map( word => getElementsCount(word,broadCastDictionary)).reduceByKey((x,y) => x+y)

words = ParallelCollectionRDD[3] at parallelize at <console>:33
grammarElementCounts = ShuffledRDD[5] at reduceByKey at <console>:34


ShuffledRDD[5] at reduceByKey at <console>:34

In [19]:
grammarElementCounts.collect

Array((noun,2), (unknown,3), (adjective,2), (verb,2))

## Escenario hipotético 2

### Sin broadcast

In [25]:
val names = sc.textFile("names").map(line => (line.split(",")(3),line))

names = MapPartitionsRDD[14] at map at <console>:27


lastException: Throwable = null


MapPartitionsRDD[14] at map at <console>:27

In [34]:
val addresses = sc.textFile("address").map(line => (line.split(",")(0),line))

addresses = MapPartitionsRDD[26] at map at <console>:27


lastException: Throwable = null


MapPartitionsRDD[26] at map at <console>:27

In [35]:
addresses.collect().toMap

Map(001 -> 001,india,Dheli, 002 -> 002,usa,Springfield, 003 -> 003,india,Calcuta)

In [40]:
names.join(addresses).take(5)

Array((002,(Ursula,Fuentes,Berain,002,002,usa,Springfield)), (003,(Mariana,Orantes,Garcia,003,003,india,Calcuta)), (001,(Libertad,Badillo,Hernandez,001,001,india,Dheli)))

Entonces, con el join anterior tanto names como addresses sufrirían de un shuffling en el clúster para poder llevar a cabo el join.

### Alternativa 1
Mandando el RDD más pequeño a cada nodo del ejecutor

In [42]:
val addressesMap = addresses.collect().toMap
val joined = names.map(v=>(v._2,(addressesMap(v._1))))

addressesMap = Map(001 -> 001,india,Dheli, 002 -> 002,usa,Springfield, 003 -> 003,india,Calcuta)
joined = MapPartitionsRDD[37] at map at <console>:32


lastException: Throwable = null


MapPartitionsRDD[37] at map at <console>:32

In [46]:
joined.take(3)

lastException: Throwable = null


Array((Libertad,Badillo,Hernandez,001,001,india,Dheli), (Ursula,Fuentes,Berain,002,002,usa,Springfield), (Mariana,Orantes,Garcia,003,003,india,Calcuta))

Esto tampoco resuelve del todo la situación, ya que para cada tarea en cada nodo podríamos estar mandando una gran cantidad de datos, lo cual hace este procedimiento ineficiente.


## Fuentes teóricas y de los ejercicios
* https://books.japila.pl/apache-spark-internals/apache-spark-internals/2.4.4/spark-broadcast.html
* https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables
* https://blog.knoldus.com/broadcast-variables-in-spark-how-and-when-to-use-them/
* http://www.prathapkudupublog.com/2018/06/accumulators-and-broadcast-variables-in.html
* https://vishnuviswanath.com/spark_rdd_part2.html
