# Indice

* Mi viaje en programacion: procedural -> orientada a objetos -> funcional
* MapReduce como simplificacion de la computacion distribuida
* Reglas para distribuir el trabajo:
    * Funciones puras (sin mutabilidad)
    * Sin Excepciones (Monads: Option y Try)
* Practica local
    * map
    * flatMap
    * reduce
    * Mutabilidad y condiciones de carrera
    * manejo de excepciones
* En streaming
    * akka-streams
    * dataflow / scio
    * apache spark

# Mi viaje: Basic -> C# -> Scala

## Procedural
GOTO?

## OOP: clases y objetos
perfecto, hasta que tenemos que usar varios procesadores. Lock/mutex nos restringen el uso de multiples procesadores

## Funcional
Incomodo y dificil de razonar en un principio. El mejor camino para sacarle el mejor performance a multiples procesadores

# MapReduce como simplificacion de la computacion distribuida
¿Como procesarias bigdata, si solo pudieras usar personas para procesar todo?

* Enseñar a cada uno como procesar los datos (funcion/algoritmia)
* Fraccionar el trabajo (en chunks) para poder distribuir parte de los datos a cada usuario (shuffle)
* Enviar datos a los usuarios para que los procesen y nos devuelva el resultado (Map o Reduce)
* Juntar todos los datos

## Map: N -> N (chunk -> chunk)
Entrar N registros y sale la misma cantidad

## Reduce: N -> M (chunk -> dato)
Entra N registros y sale M registros. Muchas veces sale 1 solo registro por chunk


# Reglas para distribuir el trabajo

## Funciones puras
Cada dato debe ser calculado independiente de los datos anteriores, para evitar dependencias y condiciones de carrera

## No usar excepciones
Se deben capturar los errores sin que esos arrojen una excepcion al sistema (o uno de los trabajadores podria caerse y dejar el trabajo en un estado incierto).
Los monad (Option/Try) nos facilitan esta tarea (su explicacion en unos minutos)

# Practica local

In [1]:
// Carguemos dependencias
import $ivy.`org.scala-lang.modules:scala-parallel-collections_2.13:1.0.3`
import scala.collection.parallel.CollectionConverters._
import scala.util.{Try, Success, Failure}

[32mimport [39m[36m$ivy.$                                                             
[39m
[32mimport [39m[36mscala.collection.parallel.CollectionConverters._
[39m
[32mimport [39m[36mscala.util.{Try, Success, Failure}[39m

## map

In [2]:
val duplica: Int => Int = { i => i * 2 }
def triplica(v: Int): Int = v * 3

(1 to 10).map(duplica)
(1 to 10).map(triplica)

[36mduplica[39m: [32mInt[39m => [32mInt[39m = ammonite.$sess.cmd1$Helper$$Lambda$2385/1240126282@3075689a
defined [32mfunction[39m [36mtriplica[39m
[36mres1_2[39m: [32mIndexedSeq[39m[[32mInt[39m] = [33mVector[39m([32m2[39m, [32m4[39m, [32m6[39m, [32m8[39m, [32m10[39m, [32m12[39m, [32m14[39m, [32m16[39m, [32m18[39m, [32m20[39m)
[36mres1_3[39m: [32mIndexedSeq[39m[[32mInt[39m] = [33mVector[39m([32m3[39m, [32m6[39m, [32m9[39m, [32m12[39m, [32m15[39m, [32m18[39m, [32m21[39m, [32m24[39m, [32m27[39m, [32m30[39m)

In [3]:
def aumenta(n:Int)(v: Int): Int = n * v

(1 to 10).map(aumenta(2))
(1 to 10).map(aumenta(3))

defined [32mfunction[39m [36maumenta[39m
[36mres2_1[39m: [32mIndexedSeq[39m[[32mInt[39m] = [33mVector[39m([32m2[39m, [32m4[39m, [32m6[39m, [32m8[39m, [32m10[39m, [32m12[39m, [32m14[39m, [32m16[39m, [32m18[39m, [32m20[39m)
[36mres2_2[39m: [32mIndexedSeq[39m[[32mInt[39m] = [33mVector[39m([32m3[39m, [32m6[39m, [32m9[39m, [32m12[39m, [32m15[39m, [32m18[39m, [32m21[39m, [32m24[39m, [32m27[39m, [32m30[39m)

In [4]:
val cuatriplica = aumenta(4)(_)

(1 to 10).map(cuatriplica)

[36mcuatriplica[39m: [32mInt[39m => [32mInt[39m = ammonite.$sess.cmd3$Helper$$Lambda$2459/1076319808@c89cf46
[36mres3_1[39m: [32mIndexedSeq[39m[[32mInt[39m] = [33mVector[39m([32m4[39m, [32m8[39m, [32m12[39m, [32m16[39m, [32m20[39m, [32m24[39m, [32m28[39m, [32m32[39m, [32m36[39m, [32m40[39m)

## flatMap

In [5]:
val censo = List (
    "onka,panda,quimera", // casa 1
    "monty", // casa 2
    "canaima", // casa 3
    "obi,max" // casa 4
)

censo.map { perros => perros.split(",") }

[36mcenso[39m: [32mList[39m[[32mString[39m] = [33mList[39m([32m"onka,panda,quimera"[39m, [32m"monty"[39m, [32m"canaima"[39m, [32m"obi,max"[39m)
[36mres4_1[39m: [32mList[39m[[32mArray[39m[[32mString[39m]] = [33mList[39m(
  [33mArray[39m([32m"onka"[39m, [32m"panda"[39m, [32m"quimera"[39m),
  [33mArray[39m([32m"monty"[39m),
  [33mArray[39m([32m"canaima"[39m),
  [33mArray[39m([32m"obi"[39m, [32m"max"[39m)
)

In [6]:
censo.map { perros => perros.split(",") }.flatten

[36mres5[39m: [32mList[39m[[32mString[39m] = [33mList[39m(
  [32m"onka"[39m,
  [32m"panda"[39m,
  [32m"quimera"[39m,
  [32m"monty"[39m,
  [32m"canaima"[39m,
  [32m"obi"[39m,
  [32m"max"[39m
)

In [7]:
censo.flatMap { perros => perros.split(",") }

[36mres6[39m: [32mList[39m[[32mString[39m] = [33mList[39m(
  [32m"onka"[39m,
  [32m"panda"[39m,
  [32m"quimera"[39m,
  [32m"monty"[39m,
  [32m"canaima"[39m,
  [32m"obi"[39m,
  [32m"max"[39m
)

## reduce

In [8]:
val edades = {    
    val rnd = new java.util.Random
    
    (1 to 10).map { i =>
        val value = rnd.nextInt % 75
        
        if (value > 0)
            value
        else
            value * -1
    }
}

val maximo: (Int, Int) => Int = { case (a, b) => if(a > b) a else b }
val minimo: (Int, Int) => Int = { case (a, b) => if(a < b) a else b }
val suma: (Int, Int) => Int = _ + _

edades.reduce(maximo)
edades.reduce(minimo)
edades.reduce(suma)

[36medades[39m: [32mIndexedSeq[39m[[32mInt[39m] = [33mVector[39m([32m48[39m, [32m29[39m, [32m71[39m, [32m7[39m, [32m2[39m, [32m12[39m, [32m29[39m, [32m31[39m, [32m28[39m, [32m16[39m)
[36mmaximo[39m: ([32mInt[39m, [32mInt[39m) => [32mInt[39m = ammonite.$sess.cmd7$Helper$$Lambda$2607/1053818651@69e51f04
[36mminimo[39m: ([32mInt[39m, [32mInt[39m) => [32mInt[39m = ammonite.$sess.cmd7$Helper$$Lambda$2608/120085331@388e9f34
[36msuma[39m: ([32mInt[39m, [32mInt[39m) => [32mInt[39m = ammonite.$sess.cmd7$Helper$$Lambda$2609/1320308301@49015052
[36mres7_4[39m: [32mInt[39m = [32m71[39m
[36mres7_5[39m: [32mInt[39m = [32m2[39m
[36mres7_6[39m: [32mInt[39m = [32m273[39m

In [9]:
edades.par.reduce(maximo)
edades.par.reduce(minimo)
edades.par.reduce(suma)

[36mres8_0[39m: [32mInt[39m = [32m71[39m
[36mres8_1[39m: [32mInt[39m = [32m2[39m
[36mres8_2[39m: [32mInt[39m = [32m273[39m

## Mutabilidad y condiciones de carrera

In [10]:
val numTuplas = 8
val testTupla =
    (1 to numTuplas).map { _ => (1, 2) } ++
    (1 to numTuplas).map { _ => (2, 3) }

[36mnumTuplas[39m: [32mInt[39m = [32m8[39m
[36mtestTupla[39m: [32mIndexedSeq[39m[([32mInt[39m, [32mInt[39m)] = [33mVector[39m(
  ([32m1[39m, [32m2[39m),
  ([32m1[39m, [32m2[39m),
  ([32m1[39m, [32m2[39m),
  ([32m1[39m, [32m2[39m),
  ([32m1[39m, [32m2[39m),
  ([32m1[39m, [32m2[39m),
  ([32m1[39m, [32m2[39m),
  ([32m1[39m, [32m2[39m),
  ([32m2[39m, [32m3[39m),
  ([32m2[39m, [32m3[39m),
  ([32m2[39m, [32m3[39m),
  ([32m2[39m, [32m3[39m),
  ([32m2[39m, [32m3[39m),
  ([32m2[39m, [32m3[39m),
  ([32m2[39m, [32m3[39m),
  ([32m2[39m, [32m3[39m)
)

In [11]:
// funcion pura
def suma(a: Int, b: Int) = a + b

testTupla.map { case (a,b) => suma(a, b) }

defined [32mfunction[39m [36msuma[39m
[36mres10_1[39m: [32mIndexedSeq[39m[[32mInt[39m] = [33mVector[39m(
  [32m3[39m,
  [32m3[39m,
  [32m3[39m,
  [32m3[39m,
  [32m3[39m,
  [32m3[39m,
  [32m3[39m,
  [32m3[39m,
  [32m5[39m,
  [32m5[39m,
  [32m5[39m,
  [32m5[39m,
  [32m5[39m,
  [32m5[39m,
  [32m5[39m,
  [32m5[39m
)

In [12]:
testTupla.par.map { case (a,b) => suma(a, b) }

[36mres11[39m: [32mcollection[39m.[32mparallel[39m.[32mimmutable[39m.[32mParSeq[39m[[32mInt[39m] = ParVector(3, 3, 3, 3, 3, 3, 3, 3, 5, 5, 5, 5, 5, 5, 5, 5)

In [13]:
// funcion impura
var acumulado = 0
def acumula(a: Int) = {
    acumulado = acumulado + a
    
    acumulado
}

testTupla.map { case (a,b) => acumula(a) }

In [14]:
// reseteamos acumulador
acumulado = 0
// resultado no es igual al anterior
testTupla.par.map { case (a,b) => acumula(a) }

[36mres13_1[39m: [32mcollection[39m.[32mparallel[39m.[32mimmutable[39m.[32mParSeq[39m[[32mInt[39m] = ParVector(1, 2, 4, 5, 5, 6, 6, 7, 2, 3, 5, 7, 9, 11, 13, 15)

## Manejo de excepciones

In [15]:
val intText = List("1", "2", "3", "cuatro", "5", "-1")

[36mintText[39m: [32mList[39m[[32mString[39m] = [33mList[39m([32m"1"[39m, [32m"2"[39m, [32m"3"[39m, [32m"cuatro"[39m, [32m"5"[39m, [32m"-1"[39m)

In [16]:
intText.map { t => t.toInt }

: 

In [17]:
intText.map{ i => try {
        i.toInt
    } catch {
        // que devuelvo si no puedo procesar
        case ex: Throwable => -1
    }
}

[36mres16[39m: [32mList[39m[[32mInt[39m] = [33mList[39m([32m1[39m, [32m2[39m, [32m3[39m, [32m-1[39m, [32m5[39m, [32m-1[39m)

In [18]:
intText.map { i => Try { i.toInt } }

[36mres17[39m: [32mList[39m[[32mTry[39m[[32mInt[39m]] = [33mList[39m(
  [33mSuccess[39m([32m1[39m),
  [33mSuccess[39m([32m2[39m),
  [33mSuccess[39m([32m3[39m),
  [33mFailure[39m(java.lang.NumberFormatException: For input string: "cuatro"),
  [33mSuccess[39m([32m5[39m),
  [33mSuccess[39m([32m-1[39m)
)

In [19]:
intText.map { i => Try { i.toInt }.toOption }
intText.flatMap { i => Try { i.toInt }.toOption }

[36mres18_0[39m: [32mList[39m[[32mOption[39m[[32mInt[39m]] = [33mList[39m(
  [33mSome[39m([32m1[39m),
  [33mSome[39m([32m2[39m),
  [33mSome[39m([32m3[39m),
  [32mNone[39m,
  [33mSome[39m([32m5[39m),
  [33mSome[39m([32m-1[39m)
)
[36mres18_1[39m: [32mList[39m[[32mInt[39m] = [33mList[39m([32m1[39m, [32m2[39m, [32m3[39m, [32m5[39m, [32m-1[39m)

# En streaming

## akka-streams
vamos al notebook akka-streams!

## [apache beam](https://beam.apache.org) / [scio](https://github.com/spotify/scio) y [apache spark](https://spark.apache.org)
ambos siguen los mismos principios de programacion funcional.
las [UDF](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/expressions/UserDefinedFunction.html) de scala usan una funcion como entrada 

# ¡Muchas gracias a todos!