# Desarrollo
## Carga del DataFrame

In [1]:
val data = spark
    .read
    .option("inferSchema", "true")
    .option("header", "true")
    .option("delimiter", "\t")
    .csv("/Users/rjmasache/Documents/course4/pAdvance/secondPeriod/project/Datos_ENEMDU_PEA_v2.csv")

Definición del esquema de forma programable

In [3]:
import org.apache.spark.sql.types._
val dataSchema = StructType(
    Array(
        StructField("id", DecimalType(26, 0), true),
        StructField("anio", IntegerType, true),
        StructField("mes", IntegerType, true),
        StructField("provincia", IntegerType, true),
        StructField("canton", IntegerType, true),
        StructField("area", StringType, true),
        StructField("genero", StringType, true),
        StructField("edad", IntegerType, true),
        StructField("estado", StringType, true),
        StructField("instruccion", StringType, true),
        StructField("etnia", StringType, true),
        StructField("ingreso", IntegerType, true),
        StructField("condicion", StringType, true),
        StructField("sectorizacion", StringType, true),
        StructField("ocupacion", StringType, true),
        StructField("rama", StringType, true),
        StructField("factor", DoubleType, true)
    ));

Envío del esquema

In [5]:
val data = spark
    .read
    .schema(dataSchema)
    .option("header", "true")
    .option("delimiter", "\t")
    .csv("/Users/rjmasache/Documents/course4/pAdvance/secondPeriod/project/Datos_ENEMDU_PEA_v2.csv");


Carga de DataFrames y uso de Inner join para unir las columnas **provincia** y **canton** con sus nombres respectivos

In [7]:
val province = spark
  .read
  .option("InferSchema","true")
  .option("header","true")
  .option("delimiter","\t")
  .csv("/Users/rjmasache/Documents/course4/pAdvance/secondPeriod/project/provincias.csv")

In [8]:
val canton = spark
  .read
  .option("InferSchema","true")
  .option("header","true")
  .option("delimiter","\t")
  .csv("/Users/rjmasache/Documents/course4/pAdvance/secondPeriod/project/cantones.csv")

In [9]:
val innerProvince = data.join(province, data("provincia") === province("codigoProvincia"), "inner")

In [10]:
// z.show(innerProvince.select("provincia", "codigoProvincia", "nombreProvincia").orderBy($"provincia").distinct)

In [11]:
val innerCanton = innerProvince.join(canton, innerProvince("canton") === canton("codigoCanton"), "inner")

In [12]:
// z.show(innerCanton)

Eliminación de columnas repetidas para la obtención del DataFrame final

In [14]:
val finalData = innerCanton.drop("provincia", "canton")
// z.show(finalData)


## Análisis de las variables del DataFrame basado en cuestiones

**Pregunta 1**: ¿Cuántas personas de etnia *Indígena* podemos encontrar en la rama *P enseñanza*?

In [17]:
finalData.where($"rama" === "16 - P. Enseñanza" && $"etnia" === "1 - Indígena").groupBy("etnia").count().show()

**Pregunta 2**: ¿Cuántas mujeres pertenecen a la rama *F. construccion*?

In [19]:
finalData.where($"genero" === "2 - Mujer" && $"rama" === "06 - F. Construcción").count()

**Pregunta 3**: ¿Cuál es la media de edad de trabajadores que estan en la rama *No especificado*?

In [21]:
val mediaEdadNE = finalData.select("edad").where($"rama" === "22 - No especificado")
mediaEdadNE.select(mean("edad")).show()

**Pregunta 4**: ¿Cuál es el sueldo mínimo y máximo que se gana en la rama *A. Agricultura, ganadería caza y silvicultura y pesca*?

In [23]:
val sueldo = finalData.select("ingreso").where($"rama" === "01 - A. Agricultura, ganadería caza y silvicultura y pesca")
sueldo.select(min("ingreso") as ("Sueldo mínimo"), max("ingreso") as ("Sueldo máximo")).show()

**Pregunta 5**: ¿Cuántas personas que trabajan en la rama  de *Actividades profesionales, científicas y técnicas* poseen una instrucción *Superior Universitario*?

In [25]:
val higher = finalData.select("rama", "instruccion")
    .where($"rama" === "13 - M. Actividades profesionales, científicas y técnicas" && $"instruccion" === "09 - Superior Universitario")
higher.groupBy("rama").count().show(false)

**Pregunta 6**: ¿Qué cantidad y qué porcentaje ocupan las personas *profesionales científicos e intelectuales* dentro de la rama de *Actividades profesionales, científicas y técnicas*?

In [27]:
val group = finalData.where($"rama" === "13 - M. Actividades profesionales, científicas y técnicas" && $"ocupacion" === "02 - Profesionales científicos e intelectuales")
printf("Cantidad: %d\n", group.count())
val total = (group.count() / finalData.where($"rama" === "13 - M. Actividades profesionales, científicas y técnicas").count().toDouble) * 100
printf("El porcentaje es de %.2f %%\n", total)

**Pregunta 7**: ¿Cuántas personas pertenecientes a la rama de *Actividades profesionales, científicas y técnicas* tienen un *Post-grado*, son solteros y sueldo superior al rango establecido?

In [29]:
// Verificar si existen valores nulos en los datos
finalData.select("ingreso").groupBy("ingreso").count().sort($"count".desc).show(5)
// Seleccionar los datos que no poseen valores nulos
val notNull = finalData.select("ingreso").where($"ingreso".isNotNull)


**Obtener los cuartiles**

In [31]:
finalData.select("ingreso").summary().show()
val quartile = notNull.stat.approxQuantile("ingreso", Array(0.25, 0.75), 0.0)
val q1 = quartile(0)
val q3 = quartile(1)

**Calcular el rango intercuartil (IQR)**


In [33]:
val iqr = q3 - q1

**Calcular los límites**


In [35]:
val lowerIQR = q1 - 1.5 * iqr
val higherIQR = q3 + 1.5 * iqr

**Filtrar inferiores y superiores**


In [37]:
val lowerValue = notNull.where($"ingreso" < lowerIQR)
lowerValue.summary().show()

In [38]:
val higherValue = notNull.where($"ingreso" > higherIQR)
higherValue.summary().show()

In [39]:
val dataIQR = notNull.where($"ingreso" > lowerIQR && $"ingreso" < higherIQR)
dataIQR.select("ingreso").summary().show()

**Uso del valor de referencia**

In [41]:
val success = finalData.filter($"rama" === "13 - M. Actividades profesionales, científicas y técnicas" && $"instruccion" === "10 - Post-grado")
success.where($"ingreso" > 1214 && $"estado"=== "6 - Soltero(a)").groupBy("rama").count().show(false)

**Pregunta 8**: ¿Cuál es la cantidad de personas que trabajan dentro de la rama *Actividades profesionales, científicas y técnicas* dentro de cada provincia?

In [43]:
val quantity = finalData.select("rama", "nombreProvincia").where($"rama" === "13 - M. Actividades profesionales, científicas y técnicas")
quantity.groupBy("nombreProvincia").count().sort(desc("count")).show(24)

**Pregunta 9**: ¿Cuál es el número de personas que pertencen a la rama *Agricultura, ganadería, caza, silvicultura y pesca* dentro de la provincia de Loja?

In [45]:
EfinalData.where($"rama" === "01 - A. Agricultura, ganadería caza y silvicultura y pesca" && $"nombreProvincia" === "Loja")
    .groupBy("nombreProvincia").count().show(false)

**Pregunta 10**: ¿Cuál es la edad mínima para trabajar en la rama de *Información y comunicación*?

In [47]:
val minEdad = finalData.select("edad").where($"rama" === "10 - J. Información y comunicación")
minEdad.select(min("edad")).show()

**Pregunta 11**: ¿Cuántas mujeres  pertenencen a la rama de *Transporte y almacenamiento*?

In [49]:
finalData.select("genero").where($"rama" === "08 - H. Transporte y almacenamiento" && $"genero" === "2 - Mujer" ).count()

**Pregunta 12**: ¿Cuántos trabajadores hubo dentro del área *Actividades financieras y de seguros* en el año 2015?

In [51]:
finalData.select("anio").where($"rama" === "11 - K. Actividades financieras y de seguros" && $"anio" === "2015").count()

## Análisis detallado y visualización de las cuestiones presentadas

### Visualización 1
* ¿Cuál es la frecuencia de hombres y mujeres existentes en las distintas categorías de nivel de estudio?

In [54]:
z.show(finalData.groupBy("rama").pivot("genero").count().orderBy("rama"))

### Visualización 2
* ¿Cuál es la actividad a la que se dedican las personas que son de etnia *Indígena*?

In [56]:
val actividad = finalData.where($"etnia" === "1 - Indígena").groupBy("rama").count.orderBy($"count".desc)
z.show(actividad.select($"rama" as ("Actividad"), $"count" as ("Total")));

### Visualización 3
* ¿Cuál es la media de edad en cada área?

In [58]:
val meanEdad = finalData.groupBy("rama").agg(round(mean("edad")).cast(IntegerType)).orderBy("CAST(round(avg(edad), 0) AS INT)")
z.show(meanEdad.select($"rama" as ("Actividad"), $"CAST(round(avg(edad), 0) AS INT)" as ("Edad Media")));


### Visualización 4
* ¿Cuál es el cambio de sueldo según los años en la rama *A. Agricultura, ganadería caza y silvicultura y pesca*?

In [60]:
z.show(data.where($"rama" === "01 - A. Agricultura, ganadería caza y silvicultura y pesca")
    .groupBy("anio").pivot("genero").agg(round(avg("ingreso")).cast(IntegerType)).orderBy("anio"));

### Visualización 5
* Ampliación de la columna **instruccion** para una mejor obtención de información

In [62]:
val higher2 = finalData.select("rama", "instruccion").where($"rama" === "13 - M. Actividades profesionales, científicas y técnicas")
z.show(higher2.groupBy("rama").pivot("instruccion").count().orderBy("rama"))

### Visualización 6
* Ampliación y cálculo del porcentaje en función de la variable **occupacion** y condición elegidas

In [64]:
val group2 = data.select("rama", "ocupacion").where($"rama" === "13 - M. Actividades profesionales, científicas y técnicas")
z.show(group2.groupBy("rama").pivot("ocupacion").count().orderBy("rama"))



### Visualización 7
* Ampliación de variable elegida para un resumen de datos más comprensible

In [66]:
val success2 = finalData.select("rama", "instruccion", "estado")
    .where($"instruccion" === "10 - Post-grado" && $"ingreso" > 1214 && $"estado" === "6 - Soltero(a)").sort(desc("rama"))
z.show(success2.groupBy("rama", "estado").pivot("instruccion").count().orderBy("rama"))


### Visualización 8
* Visualización de la cantidad de personas que trabajan dentro de la rama *Actividades profesionales, científicas y técnicas* dentro de cada provincia

In [68]:
z.show(quantity.groupBy("nombreProvincia").pivot("rama").count().orderBy("nombreProvincia"))

### Visualización 9
* Ingreso máximo por el estado civil

In [70]:
val maxIngreso = finalData.groupBy("anio").pivot("estado").max("ingreso").orderBy("anio")
z.show(maxIngreso)

### Visualización 10
* Ingresos máximos según género

In [72]:
val maxGenero = finalData.groupBy("anio").pivot("genero").max("ingreso").orderBy("anio")
z.show(maxGenero)

### Visualización 11
* Promedio de ingreso según las etnias

In [74]:
val etnias = finalData.select($"etnia", $"ingreso").groupBy("etnia").avg("ingreso").sort(desc("avg(ingreso)"))
z.show(etnias)

### Visualización 12
* Ingresos por género segun la rama

In [76]:
finalData.groupBy("rama").pivot("genero").max("ingreso").orderBy("rama")
val ingresoRama = data.groupBy("rama").pivot("genero").max("ingreso").orderBy("rama")
z.show(ingresoRama)

## Spark SQL

In [78]:
finalData.createOrReplaceTempView("MySQLTable")

**Consultas:**

* Consulta que devuelve la edad promedio de las personas encuestadas

In [81]:
spark.sql("""
    SELECT AVG(ms.edad)
    FROM MySQLTable ms
""")
    .show()


* Consulta que devuelve el número de hombres y mujeres casados a los 18 años

In [83]:
spark.sql("""
    SELECT count(*), ms.genero
    FROM MySQLTable ms
    WHERE ms.estado = '1 - Casado(a)' AND ms.edad = 18
    GROUP BY ms.genero
""")
    .show()


* Consulta que devuelve el número de mujeres que poseen un empleo no remunerado por cada provincia

In [85]:
spark.sql("""
    SELECT COUNT(*), ms.nombreProvincia
    FROM MySQLTable ms
    WHERE ms.genero = '2 - Mujer' AND ms.condicion = '5 - Empleo no remunerado'
    GROUP BY ms.nombreProvincia
    ORDER BY COUNT(*) DESC
""")
    .show(24, false)

* Consulta que devuelve el número de hombres y mujeres que pertenencen a las fuerzas armadas y cuentan con un título universitario

In [87]:
spark.sql("""
    SELECT COUNT(*)
    FROM MySQLTable ms
    WHERE ms.genero = '2 - Mujer' AND ms.ocupacion = '10 - Fuerzas Armadas' AND ms.instruccion = '09 - Superior Universitario'
""")
    .show(false)

* Consulta que devuelve el número de hombres y mujeres que tienen un Post-grado dentro de cada provincia

In [89]:
spark.sql("""
    SELECT COUNT(*), ms.genero, ms.nombreProvincia
    FROM MySQLTable ms
    WHERE instruccion = '10 - Post-grado'
    GROUP BY ms.genero, ms.nombreProvincia
    ORDER BY ms.nombreProvincia 
""")
    .show(48, false)

* Consulta que devuelve el número total de desempleados por cada provincia

In [91]:
spark.sql("""
    SELECT ms.nombreProvincia, COUNT(*)
    FROM MySQLTable ms
    WHERE ms.condicion LIKE '%Desempleo%'
    GROUP BY ms.nombreProvincia
    ORDER BY 2 DESC
""")
    .show(24, false)