# Bigdata: Scala + Spark

<img src="images/BigData.png">

#### Definición:

Conjunto de Datos caracterizado por su volumen, velocidad y variedad (VVV).

#### Beneficios:

- Análisis de Datos
- ETL(Extracción, transfomación y carga)**
- Streaming
- Procesamiento
- Machine learnig

## ETL (Extract, Transform and Load)

Es el proceso que permite a las organizaciones mover datos desde múltiples fuentes, reformatearlos y limpiarlos, y cargarlos en otra base de datos, data mart, o data warehouse para analizar, o en otro sistema operacional para apoyar un proceso de negocio.

Nos perminte la integración de datos. En Bigdata podemos las podemos construir:

1. **Orquestador**
2. **Scala**
3. **Spark**
4. **Herramientas de integración Continua.**

<img src="images/tools-bigdata-scala.png">


### 1. Orquestador

Nos permite la contrucción y control del flujo de trabajo. Actualmente existen varios automatizadores de estos flujos que se puede usar según las necesidades Ej:

- Apache Airflow
- Apache kafka
- Apahe Oozie
- AWS Step Functions

### 2. Scala

Leguaje que aprovecha la combinación de la programción orientada a objetos y la programación funcional. Similar a Java. Su demanda ha aumentado en los últimos años. A continuación realizaremos una pequeña introducción:

In [2]:
val a: Int = 1
val b = 2 //inmutable
var c = 3 //mutable

a: Int = 1
b: Int = 2
c: Int = 3


In [3]:
b = 5 // No se puede alterar su valor

<console>: 26: error: reassignment to val

In [4]:
val d = "hola mundo"
val e = 'e'
val f = 123.4
val g = -5678
val h = true
val i = Array("x","y", "z")

d: String = hola mundo
e: Char = e
f: Double = 123.4
g: Int = -5678
h: Boolean = true
i: Array[String] = Array(x, y, z)


## Clases y Objetos


<img src="images/clases-objetos.jpg">


In [5]:

class Point(val xc: Int, val yc: Int) {
   var xf: Int = xc
   var yf: Int = yc
   
   def move() {
      println("Point x location : " + xf)
      println("Point y location : " + yf) 
   }
}

object Demo {
   def main() {
      val pt = new Point(10, 20)
      pt.move();
   }
}

defined class Point
defined object Demo


In [6]:
Demo.main()

Point x location : 10
Point y location : 20


### Operadores Aritmeticos

In [7]:
var x = 10
var y = 20
println("x + y: " + (x + y) )
println("x - y: " + (x - y) )
println("x * y: " + (x * y) )
println("x / y: " + (x / y) )
println("x % y: " + (x % y) )

x + y: 30
x - y: -10
x * y: 200
x / y: 0
x % y: 10


x: Int = 10
y: Int = 20


### Operadores Relacionales

In [8]:
var x = 50
var y = 25

println("x es igual que y: " + (x == y) )
println("x es diferente que y: " + (x != y) )
println("x es mayor que y: " + (x > y) )
println("x es menor que y: " + (x < y) )
println("x es mayor o igual que y: " + (x >= y) )
println("x es menor o igual que y: " + (x <= y) )

x es igual que y: false
x es diferente que y: true
x es mayor que y: true
x es menor que y: false
x es mayor o igual que y: true
x es menor o igual que y: false


x: Int = 50
y: Int = 25


### Operadores Logicos

In [9]:
var x = true
var y = false
var z = false

println("x AND y: " + (x && y) )
println("x OR y: " + (x || y) )
println("NOT x : " + (!x) )

x AND y: false
x OR y: true
NOT x : false


x: Boolean = true
y: Boolean = false
z: Boolean = false


In [10]:
var x = 1
x += 2 
print("x = x + 2 = " + x)

x = x + 2 = 3

x: Int = 3


### Condicionales

In [11]:
var x = 2.95
if( x >= 2.95 ) println("Aprobado")
 else println("Reprobado")

Aprobado


x: Double = 2.95


#### Switch Case en Scala

In [12]:
var y = 1

y match {
   case 1 => "Uno"
   case 2 => "Dos"
   case 3 => "Tres"
   case 4 => "Cuatro"
}

y: Int = 1
res6: String = Uno


### Ciclos

In [13]:
for( x <- 1 until 10){
    println( "Valor de x: " + x );
}

Valor de x: 1
Valor de x: 2
Valor de x: 3
Valor de x: 4
Valor de x: 5
Valor de x: 6
Valor de x: 7
Valor de x: 8
Valor de x: 9


In [14]:
val xList = List(1,2,3,4,5)

for( x <- xList ){
    println( "Valor de x: " + x )
}

Valor de x: 1
Valor de x: 2
Valor de x: 3
Valor de x: 4
Valor de x: 5


xList: List[Int] = List(1, 2, 3, 4, 5)


# 3. Spark

In [15]:
// crear un Dataframe son inmutables
val emptyDf = spark.emptyDataFrame
val namesDf = Seq(("Juan", "Perez"),("Andres" , "Rueda")).toDF("first_name", "last_name")

emptyDf: org.apache.spark.sql.DataFrame = []
namesDf: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string]


In [16]:
// ver data 
emptyDf.show()
namesDf.show()
namesDf.show(1)

++
||
++
++

+----------+---------+
|first_name|last_name|
+----------+---------+
|      Juan|    Perez|
|    Andres|    Rueda|
+----------+---------+

+----------+---------+
|first_name|last_name|
+----------+---------+
|      Juan|    Perez|
+----------+---------+
only showing top 1 row



In [17]:
// ver estructura
emptyDf.printSchema()
namesDf.printSchema()

root

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)



In [18]:
// ver columnas
namesDf.columns

res11: Array[String] = Array(first_name, last_name)


In [19]:
// CSV a Dataframe
val world_happiness = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("input/2020.csv")

world_happiness: org.apache.spark.sql.DataFrame = [Country name: string, Regional indicator: string ... 18 more fields]


In [20]:
// Imprimimos la cantidad de registros
println("--------------- Cantidad de registros ---------------")
println(world_happiness.count())
println("--------------- Estructura del DF ---------------")
world_happiness.printSchema()

--------------- Cantidad de registros ---------------
153
--------------- Estructura del DF ---------------
root
 |-- Country name: string (nullable = true)
 |-- Regional indicator: string (nullable = true)
 |-- Ladder score: string (nullable = true)
 |-- Standard error of ladder score: string (nullable = true)
 |-- upperwhisker: string (nullable = true)
 |-- lowerwhisker: string (nullable = true)
 |-- Logged GDP per capita: string (nullable = true)
 |-- Social support: string (nullable = true)
 |-- Healthy life expectancy: string (nullable = true)
 |-- Freedom to make life choices: string (nullable = true)
 |-- Generosity: string (nullable = true)
 |-- Perceptions of corruption: string (nullable = true)
 |-- Ladder score in Dystopia: string (nullable = true)
 |-- Explained by: Log GDP per capita: string (nullable = true)
 |-- Explained by: Social support: string (nullable = true)
 |-- Explained by: Healthy life expectancy: string (nullable = true)
 |-- Explained by: Freedom to make li

In [28]:
val expectancy_life_healty_sin_cast = world_happiness.select(
    $"Country name",
    $"Regional indicator",
    $"Healthy life expectancy"
)
expectancy_life_healty_sin_cast.show()

+--------------+--------------------+-----------------------+
|  Country name|  Regional indicator|Healthy life expectancy|
+--------------+--------------------+-----------------------+
|       Finland|      Western Europe|             71.9008255|
|       Denmark|      Western Europe|            72.40250397|
|   Switzerland|      Western Europe|            74.10244751|
|       Iceland|      Western Europe|                     73|
|        Norway|      Western Europe|            73.20078278|
|   Netherlands|      Western Europe|            72.30091858|
|        Sweden|      Western Europe|            72.60076904|
|   New Zealand|North America and...|            73.20262909|
|       Austria|      Western Europe|            73.00250244|
|    Luxembourg|      Western Europe|            72.59999847|
|        Canada|North America and...|            73.60160065|
|     Australia|North America and...|            73.60453796|
|United Kingdom|      Western Europe|            72.30160522|
|       

expectancy_life_healty_sin_cast: org.apache.spark.sql.DataFrame = [Country name: string, Regional indicator: string ... 1 more field]


In [22]:
// filtrar y seleccionar en un dataframe
import org.apache.spark.sql._

// Imprimmos los valores la columna seleccionada
world_happiness.select($"Country name").show()

val expectancy_life = world_happiness.select(
    $"Country name",
    $"Regional indicator",
    $"Healthy life expectancy".cast("double")
)

world_happiness.filter(
    $"Country name" === "Colombia" 
).select(
    $"Healthy life expectancy"
).show()

world_happiness.filter(
    $"Country name" === "Colombia"
).select(
    $"Country name",
    $"Healthy life expectancy"
).show()

+--------------+
|  Country name|
+--------------+
|       Finland|
|       Denmark|
|   Switzerland|
|       Iceland|
|        Norway|
|   Netherlands|
|        Sweden|
|   New Zealand|
|       Austria|
|    Luxembourg|
|        Canada|
|     Australia|
|United Kingdom|
|        Israel|
|    Costa Rica|
|       Ireland|
|       Germany|
| United States|
|Czech Republic|
|       Belgium|
+--------------+
only showing top 20 rows

+-----------------------+
|Healthy life expectancy|
+-----------------------+
|            67.69958496|
+-----------------------+

+------------+-----------------------+
|Country name|Healthy life expectancy|
+------------+-----------------------+
|    Colombia|            67.69958496|
+------------+-----------------------+



import org.apache.spark.sql._
expectancy_life: org.apache.spark.sql.DataFrame = [Country name: string, Regional indicator: string ... 1 more field]


In [23]:
expectancy_life.printSchema()

root
 |-- Country name: string (nullable = true)
 |-- Regional indicator: string (nullable = true)
 |-- Healthy life expectancy: double (nullable = true)



In [24]:
// agrupaciones
expectancy_life.groupBy($"Regional indicator").count().show(false)

+----------------------------------+-----+
|Regional indicator                |count|
+----------------------------------+-----+
|South Asia                        |7    |
|Middle East and North Africa      |17   |
|North America and ANZ             |4    |
|Sub-Saharan Africa                |39   |
|East Asia                         |6    |
|Commonwealth of Independent States|12   |
|Latin America and Caribbean       |21   |
|Western Europe                    |21   |
|Central and Eastern Europe        |17   |
|Southeast Asia                    |9    |
+----------------------------------+-----+



In [25]:
// Agrupamos por Regional Indicator 
expectancy_life.groupBy(
    $"Regional indicator"
).agg(
    max($"Healthy life expectancy").as("max life expectancy")
).show(false)

+----------------------------------+-------------------+
|Regional indicator                |max life expectancy|
+----------------------------------+-------------------+
|South Asia                        |70.59999847        |
|Middle East and North Africa      |73.20025635        |
|North America and ANZ             |73.60453796        |
|Sub-Saharan Africa                |66.40434265        |
|East Asia                         |76.77170563        |
|Commonwealth of Independent States|66.75065613        |
|Latin America and Caribbean       |71.29985046        |
|Western Europe                    |74.40270996        |
|Central and Eastern Europe        |71.1029892         |
|Southeast Asia                    |76.80458069        |
+----------------------------------+-------------------+



In [27]:
//agregar columna
val expectancy_life_2020 = expectancy_life.withColumn("year",lit("2020").cast("Integer"))
expectancy_life_2020.show()

+--------------+--------------------+-----------------------+----+
|  Country name|  Regional indicator|Healthy life expectancy|year|
+--------------+--------------------+-----------------------+----+
|       Finland|      Western Europe|             71.9008255|2020|
|       Denmark|      Western Europe|            72.40250397|2020|
|   Switzerland|      Western Europe|            74.10244751|2020|
|       Iceland|      Western Europe|                   73.0|2020|
|        Norway|      Western Europe|            73.20078278|2020|
|   Netherlands|      Western Europe|            72.30091858|2020|
|        Sweden|      Western Europe|            72.60076904|2020|
|   New Zealand|North America and...|            73.20262909|2020|
|       Austria|      Western Europe|            73.00250244|2020|
|    Luxembourg|      Western Europe|            72.59999847|2020|
|        Canada|North America and...|            73.60160065|2020|
|     Australia|North America and...|            73.60453796|2

expectancy_life_2020: org.apache.spark.sql.DataFrame = [Country name: string, Regional indicator: string ... 2 more fields]


In [29]:
val citiesDf = Seq(("Colombia", "Bogota"),("Colombia" , "Medellin")).toDF("Country name", "City")


citiesDf: org.apache.spark.sql.DataFrame = [Country name: string, City: string]


In [31]:

// join pueden ser inner, left, right, outer, leftsemi, leftanti
val citiesAndCountries = expectancy_life_2020.as("table1").join(
    citiesDf.as("table2"), 
    $"table1.Country name" === $"table2.Country name",
    "inner"
).select(
    $"table1.Country name".as("country"),
    $"table1.Regional indicator".as("regional"),
    $"table2.City"
)


citiesAndCountries.show()

+--------+--------------------+--------+
| country|            regional|    City|
+--------+--------------------+--------+
|Colombia|Latin America and...|Medellin|
|Colombia|Latin America and...|  Bogota|
+--------+--------------------+--------+



citiesAndCountries: org.apache.spark.sql.DataFrame = [country: string, regional: string ... 1 more field]


In [32]:
//Escribir CSV, avro o parquet 
citiesAndCountries.write.mode("overwrite").parquet("parquet")
citiesAndCountries.write.format("com.databricks.spark.avro").mode("overwrite").save("avro")
citiesAndCountries.write.option("header", "true").csv("CSV")

java.lang.ClassNotFoundException:  Failed to find data source: org.apache.spark.sql.avro.AvroFileFormat. Please find packages at http://spark.apache.org/third-party-projects.html

### Shuffle

La redistribución de cargas en diferentes nodos o diferentes maquinas. el uso de join, groupBy aumentan estas reparticiones.



### Buenas prácticas

- Validar el comportamiento de dataframes con el uso del comando explain(). Spark 3 mejora esta funcionalidad.

In [33]:
citiesAndCountries.explain()

== Physical Plan ==
*(1) Project [Country name#34 AS country#278, Regional indicator#35 AS regional#279, City#234]
+- *(1) BroadcastHashJoin [Country name#34], [Country name#233], Inner, BuildRight
   :- *(1) Project [Country name#34, Regional indicator#35]
   :  +- *(1) Filter isnotnull(Country name#34)
   :     +- *(1) FileScan csv [Country name#34,Regional indicator#35] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/jespitiaa/Curso_Scala/input/2020.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Country name)], ReadSchema: struct<Country name:string,Regional indicator:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [Country name#233, City#234]


- Conocer las herramientas con las que trabajas y validar en lo posible las configuraciones de spark: http://spark.apache.org/docs/latest/sql-performance-tuning.html

In [34]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.getConf("spark.sql.shuffle.partitions")
sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold")

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@5e2b623
res23: String = 10485760


- Evitar el uso del count() a menos que sea necesario retornar el número de filas.
- Cuando se requiere agrupar analizar cuál función utilizar: groupBy, groupByKey, reduceByKey, TreeReduce/TreeAggregate
- Al realizar Join validar data para utilizar estrategias como: broadcast, Sort Merge, Shuffle Hash o Cartesian. Spark 3 a optimizado el uso de estas.

In [35]:
val citiesAndCountries = expectancy_life_2020.as("table1").join(
    broadcast(citiesDf).as("table2"), 
    $"table1.Country name" === $"table2.Country name",
    "inner"
).select(
    $"table1.Country name".as("country"),
    $"table1.Regional indicator".as("regional"),
    $"table2.City"
)

citiesAndCountries: org.apache.spark.sql.DataFrame = [country: string, regional: string ... 1 more field]


- Dependiendo de la complejidad de transformaciones y recursos puede guardar algunos datos en memoria. https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/
- Serializar es importante en aplicaciones distribuidas.
- Evitar el uso del crossjoin.
- comunicación de equipo (data engineer, arquitectos, devops, cientificos, gobierno de datos )