In [0]:
import org.apache.spark.sql.types._
val myDataSchema = StructType(
    Array(
        StructField("id_persona", DecimalType(26, 0), true), 
        StructField("anio", IntegerType, true), 
        StructField("mes", IntegerType, true), 
        StructField("provincia", IntegerType, true), 
        StructField("canton", IntegerType, true), 
        StructField("area", StringType, true), 
        StructField("genero", StringType, true), 
        StructField("edad", IntegerType, true), 
        StructField("estado_civil", StringType, true), 
        StructField("nivel_de_instruccion", StringType, true), 
        StructField("etnia", StringType, true), 
        StructField("ingreso_laboral", IntegerType, true), 
        StructField("condicion_actividad", StringType, true), 
        StructField("sectorizacion", StringType, true), 
        StructField("grupo_ocupacion", StringType, true), 
        StructField("rama_actividad", StringType, true), 
        StructField("factor_expansion", DoubleType, true)
    ));

In [1]:
val data = spark
  .read
  .schema(myDataSchema)
//  .option("inferSchema", true)
  .option("header", "true")
  .option("delimiter", "\t")
  .csv("/Users/jorgaf/Documents/Clases/Abril-Agosto2020/Presencial/ProgramacionAvanzada/Proyecto2doBim/Datos_ENEMDU_PEA_v2.csv");

In [2]:
data.schema

# Transformaciones, Acciones y evaluación diferida
Las operaciones de Spark en un *DataFrame* (datos distribuidos) se pueden clasificar en dos tipos: *transformaciones* y *acciones*.


## Transformaciones
Transforman un DataFrame en un nuevo DataFrame, sin alterar los datos originales ya que estos son inmutables. En pocas palabras, algunas operaciones, tales como *select()* y *filter()* no cambian el DataFrame original, en su lugar, devuelven los resultados de la operación como un nuevo DataFrame.

Todas las *transformaciones* son evaluadas de forma diferida, es decir, sus resultados no se calculan inmediatamente, sino que se registran en un objeto, denominado *lineage*, que es un grafo, que permite a spark optimizar su ejecución, gracias a un plan de ejecución que reorganiza ciertas transformaciones, fusiona u optimiza otras.

La evaluación diferida es la estrategia de Spark para retrasar la ejecución hasta que se invoque una *acción*. Generalmente una acción devuelve un valor.Col

## Acciones
Una acción dispara la evaluación diferida de todas las transformaciones

|Transformaciones|Acciones|
|----------------|--------|
|orderBy         |show    |
|groupBy         |take    |
|filter          |count   |
|select          |collect |
|join            |save    |

## Columnas y expresiones
Tal como se mencionó anteriormente un DataFrame de spark posee columnas y filas, que conceptualmente son similares a una tabla de un motor de base de datos.

Dentro Apache Spark las columnas de un DataFrame son como objetos que poseen métodos y como tales es posible manipular los valores de las columnas con expresiones o una serie de cálculos matemáticos.

Para escoger una columna se puede usar las palabras *col* o *columna*

Ejemplos:
1. Mostrar todas las columnas
```scala
data.columns
```
2. Seleccionar una columna y obtener una objeto columna (Column)
```scala
data.col("ingreso_laboral")
```

3. Duplicar el ingreso por actividades laborales
```scala
data.select(col("ingreso_laboral") * 2).show(2)
```

También es posible crear expresiones utilizando la sentencia *expr*. Se pueden crear expresiones simples tales como: *expr("column_name * 5")* o *(expr("column_name - 5") > col("anothercolumnname"))*

4. Duplicar el ingreso por las actividades laborales
```scala
data.select(expr("ingreso_laboral * 2")).show(2)
```

También, es posible crear una nueva columna utilizando el resultado de una expresión. Por ejemplo: crear una nueva columna con valores verdadero o falso si el ingreso de una persona es mayor que el salario mínimo vital ($400).

5. Crear una nueva columna con valores verdadero o falso si el ingreso de una persona es mayor que el salario mínimo vital ($400). 
```scala
data.withColumn("MayorSMV", (expr("ingreso_laboral > 400"))).show()
```

Un ejemplo del uso de los métodos de las columnas.

6. Ordenar de forma descendente por el ingreso laboral.
```
data.sort(col("ingreso_laboral").desc).show;
data.sort($"ingreso_laboral".desc).show;
```

Otro ejemplo. Cambiar el nombre de una columna.

7. Cambiar el nombre de la columna ingreso_laboral por sueldo.
```scala
data.select($"ingreso_laboral".as("sueldo")).show()
```

Hasta aquí, únicamente se ha revisado la superficie de los métodos de las columnas, para mayor detalle se debe revisar: https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html

## Operaciones básicas con DataFrames
### Proyecciones y filtros
Dentro del argot de las bases de datos relacionales, una proyección es la habilidad de **seleccionar** todas o ciertas columnas y cuyas filas cumplen con una condición que se expresa a través de filtros. En Spark, una proyección se hace usando el método *select()*, mientras que un filtro puede hacerse utilizando los métodos *filter()* o *where()*.

Un ejemplo, seleccionar las columnas etnia, áreas e ingreso laboral en donde la etnia sea diferente de mestizo
```scala
data.select(col("etnia"), col("area"), $"ingreso_laboral").where(column("etnia") =!= "6 - Mestizo").show
```

También es posible seleccionar los valores únicos de una columna. Por ejemplo, mostrar los valores únicos de la columna *grupo_ocupacion* que sean diferentes de null y ordenarlos alfabéticamente.
```scala
data.select("grupo_ocupacion").distinct.where($"grupo_ocupacion" =!= "null").sort("grupo_ocupacion").show(false)
```
Usando el mismo ejemplo, anterior, ahora se pide contar los valores únicos de la columna *grupo_ocupacion* que sean diferentes de *null*.
```scala
data.select("grupo_ocupacion").distinct.where($"grupo_ocupacion".isNotNull).count
```

### Renombrando, agregando o borrando columnas
Es posible renombrar columnas por diferentes circunstancias como por ejemplo: estilos o estándares de nombrado. Por ejemplo, la columna *nivel_de_instruccion* se podría cambiar de nombre por uno más orientado a la programación, algo así NivelDeInstruccion.
```scala
val data2 = data.withColumnRenamed("nivel_de_instruccion", "nivelDeInstruccion")
data2.select("nivelDeInstruccion").distinct.show
```

No olvide que un DataFrame es inmutable, es decir no puede cambiar un valor, es por ello, que la acción anterior genera una nuevo DataFrame que en este caso se llama *data2* y la segunda sentencia se utiliza para mostrar que en realidad se hizo el cambio.

Para agregar columnas se puede utilzar la sentencia que ya se usó anteriormente, aquí el ejemplo adaptada para el DataFrame *data2*:
```scala
val data3 = data2.withColumn("MayorSMV", (expr("ingreso_laboral > 400")))
data3.show
```
La anterior sentencia, creará una columna que se llamará *MayorSMV* cuyos valores serán booleanos (true|false) si se cumple la condición de la expresión.

Finalmente, para borrar una columna, se debe usar el método drop y enviar como parámetro el nombre de la columna a eliminar. Para nuestro DataFrame (*data3*), la columna *id_persona* no es necesaria ya que para nuestro propósito (EDA), no tiene información relevante. Para eliminarla se debe hacer lo siguiente:
```scala
val data4 = data3.drop("id_persona")
data4.schema
```

### Agregaciones
Ahora trate de responder a la pregunta ¿cuál es la rama de actividad más frecuente en el conjunto de datos? Para buscar su respuesta es necesario usar agregaciones que son métodos que permiten agregar operaciones.
```scala
data4.select("rama_actividad").where($"rama_actividad".isNotNull).groupBy("rama_actividad").count().orderBy(desc("count")).show(false)
```

Existen otras funciones de origen estadístico que se puede implementar y que se estudiarán más adelante, tales como: *min()*, *max()*, *sum()*, *avg()*, etc