# Análisis Exploratorio de Datos



# Exploración de la data desde el archivo .csv

In [2]:
import org.apache.spark.sql.types._

val mySchema = StructType(
    Array(
        StructField("area", IntegerType, false),
        StructField("ciudad", IntegerType, false),
        StructField("conglomerado", IntegerType, false),
        StructField("panelm", IntegerType, false),
        StructField("vivienda", IntegerType, false),
        StructField("hogar", IntegerType, false),
        StructField("acceso_principal", IntegerType, false),
        StructField("tipo_vivienda", IntegerType, false),
        StructField("techo_material", IntegerType, false),
        StructField("estado_techo", IntegerType, false),
        StructField("piso_material", IntegerType, false),
        StructField("estado_piso", IntegerType, false),
        StructField("pared_material", IntegerType, false),
        StructField("estado_pared", IntegerType, false),
        StructField("nro_cuartos", IntegerType, false),
        StructField("nro_dormitorios", IntegerType, false),
        StructField("nro_cuartos_negocio", IntegerType, false),
        StructField("cocina_cuarto", IntegerType, false),
        StructField("cocinar_material", IntegerType, false),
        StructField("tipo_servicio_higienico", IntegerType, false),
        StructField("alternativa_no_higienico", IntegerType, true),
        StructField("tipo_instalacion_sanitaria", IntegerType, true),
        StructField("obtencion_agua", IntegerType, false),
        StructField("medidor_agua", IntegerType, true),
        StructField("junta_agua", IntegerType, true),
        StructField("tipo_tuberia", IntegerType, false),
        StructField("ducha", IntegerType, false),
        StructField("tipo_alumbrado", StringType, false),
        StructField("eliminacion_basura", IntegerType, false),
        StructField("tenencia_vivienda", IntegerType, false),
        StructField("valor_arriendo", IntegerType, true),
        StructField("inarriendo_agua", IntegerType, true),
        StructField("inarriendo_luz", IntegerType, true),
        StructField("parentesco_propietario", IntegerType, true),
        StructField("posesion_vehiculos", IntegerType, false),
        StructField("cantidad_vehiculos", IntegerType, true),
        StructField("posesion_motos", IntegerType, true),
        StructField("cantidad_motos", IntegerType, true),
        StructField("abastecimiento_super", IntegerType, true),
        StructField("gasto_super", IntegerType, true),
        StructField("abastecimiento_extra", IntegerType, true),
        StructField("gasto_extra", IntegerType, true),
        StructField("abastecimiento_diesel", IntegerType, true),
        StructField("gasto_diesel", IntegerType, true),
        StructField("abastecimiento_eco", IntegerType, true),
        StructField("gasto_eco", IntegerType, true),
        StructField("abastecimiento_elect", IntegerType, true),
        StructField("gasto_elect", IntegerType, true),
        StructField("abastecimiento_gas", IntegerType, true),
        StructField("gasto_gas", IntegerType, true),
        StructField("estrato", IntegerType, false),
        StructField("fexp", StringType, false),
        StructField("upm", StringType, false),
        StructField("id_vivienda", StringType, false),
        StructField("id_hogar", StringType, false),
        StructField("periodo", IntegerType, false),
        StructField("mes", StringType, false)
        ));


In [3]:
val data = spark
    .read
    .schema(mySchema)
    .option("header", "true")
    .option("delimiter", ";")
    .csv("/workspace/zeppelin-paavanzada/enemdu_vivienda_hogar_2023_I_trimestre.csv")

## Columnas Cuantitativas

### Columna Valor Arriendo
Esta columna posee los valores que las personas pagan o pagarían por el arriendo de su hogar

In [6]:
data.select("valor_arriendo").summary().show()

In [7]:
val dfArriendoUnic = data.select("valor_arriendo").groupBy("valor_arriendo").count

dfArriendoUnic.count

z.show(dfArriendoUnic)

In [8]:
import org.apache.spark.sql.types._

val avg = data.select(mean("valor_arriendo"))
		.first()(0)//fila 0 columna 0
		.asInstanceOf[Double]

In [9]:

val stdDesv = data.select(stddev("valor_arriendo"))
		.first()(0)//fila 0 columna 0
		.asInstanceOf[Double]//Traelo como instacia de tipo Double
		
		

In [10]:
val inferior = avg - 3 * stdDesv //Si es negativo no tendremos ningun valor
val superior = avg + 3 * stdDesv

In [11]:
val valoresMenoresInferior =  data.select("valor_arriendo").where($"valor_arriendo" < inferior)
valoresMenoresInferior.describe().show

In [12]:
val valoresMayoresSuperior = data.select($"valor_arriendo").where($"valor_arriendo" > superior)
valoresMayoresSuperior.describe().show

In [13]:
val arriendoSinOutliers = data.select("valor_arriendo").where($"valor_arriendo" < superior)

arriendoSinOutliers.summary().show()

In [14]:
val dataSinOutliersUnic = arriendoSinOutliers.select("valor_arriendo").groupBy("valor_arriendo").count

dataSinOutliersUnic.count

In [15]:



//Colocar Dataframe sin Ouliers y solo con Valores Únicos 
//en el contexto para luego convertirlo a dataframe en Pyspark

z.put("dfSinOutliers", dataSinOutliersUnic)


In [16]:
%pyspark
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as stats

# Obtener el DataFrame de dfCuantitativo
dfValorArriendo = DataFrame(z.get("dfSinOutliers"), sqlContext)

# Convertir la columna "valor_arriendo" a tipo Double
dfValorArriendo = dfValorArriendo.withColumn("valor_arriendo", col("valor_arriendo").cast("double"))

# Obtener los datos como una lista
valor_arriendo_data = dfValorArriendo.select("valor_arriendo").rdd.flatMap(lambda x: x).collect()

# Ordenar los datos de menor a mayor
valor_arriendo_data_sorted = sorted(valor_arriendo_data)

# Convertir los datos a un array de numpy
valor_arriendo_array = np.array(valor_arriendo_data_sorted)

# Crear el QQ plot con los datos 
plt.figure(figsize=(10, 8))
stats.probplot(valor_arriendo_array, plot=plt, dist='norm')

plt.title('QQ Plot de valor_arriendo')
plt.xlabel('Cuantiles teóricos')
plt.ylabel('Valores de arriendo')
plt.show()


## Columnas de Texto

In [18]:

val dfCualitativo = data.select("area",
"ciudad", 
"tipo_vivienda", 
"techo_material", 
"estado_techo", 
"piso_material",
"estado_piso",
"pared_material",
"estado_pared",
"tipo_servicio_higienico",
"alternativa_no_higienico",
"tipo_instalacion_sanitaria",
"tipo_tuberia",
"tenencia_vivienda",
"valor_arriendo",
"parentesco_propietario")

In [19]:
print(dfCualitativo.select("area")
            .distinct()
            .count() + "\n")
data.groupBy("area")
    .count()
    .sort("area")
    .show()


In [20]:
// Definir una función para realizar el reemplazo de valores en la columna "area"
val reemplazarArea = udf((valor: Int) => valor match {
  case 1 => "Urbana"
  case 2 => "Rural"
})

// Crear una nueva columna "area_etiqueta" con los valores reemplazados
val data_with_etiquetas = dfCualitativo.withColumn("area", reemplazarArea(col("area")))

// Agrupar y contar por "area_etiqueta"
data_with_etiquetas.groupBy("area").count().sort("area").show()

In [21]:
print(data.select("tipo_vivienda")
            .distinct()
            .count() + "\n")

data.groupBy("tipo_vivienda")
    .count()
    .sort(desc("count"))
    .show() 

In [22]:
// Definir una función para realizar el reemplazo de valores
val reemplazarTipoVivienda = udf((valor: Int) => valor match {
  case 1 => "Casa_o_villa"
  case 2 => "Departamento"
  case 3 => "Cuartos_en_casa_de_inquilinato"
  case 4 => "Mediagua"
  case 5 => "Rancho, covacha"
  case 6 => "Choza"
  case 7 => "Otra"
})

// Agrupar y contar por "tipo_vivienda"
data_with_etiquetas.groupBy("tipo_vivienda").count().sort("tipo_vivienda").show()

In [23]:
// Definir una función para realizar el reemplazo de valores en la columna "techo_material"
val reemplazarTechoMaterial = udf((valor: Int) => valor match {
  case 1 => "Hormigón (losa, cemento)"
  case 2 => "Fibrocemento, asbesto (eternit, eurolit)"
  case 3 => "Zinc, Aluminio"
  case 4 => "Teja"
  case 5 => "Palma, paja u hoja"
  case 6 => "Otro Material"
})


// Agrupar y contar por "techo_material_etiqueta"
data_with_etiquetas.groupBy("techo_material").count().sort("techo_material").show()

In [24]:
// Definir una función para realizar el reemplazo de valores en las columnas
val reemplazarEstadoTecho = udf((valor: Int) => valor match {
  case 1 => "Bueno"
  case 2 => "Regular"
  case 3 => "Malo"

})

val reemplazarPisoMaterial = udf((valor: Int) => valor match {
  case 1 => "Duela, parquet, tablón tratado o piso flotante"
  case 2 => "Cerámica, baldosa, vinil o porcelanato "
  case 3 => "Mármol o marmetón"
  case 4 => "Ladrillo o cemento"
  case 5=> "Tabla / tablón no tratado"
  case 6=> "Caña"
  case 7 => "Tierra"
  case 8=> "Otro Material"
})

val reemplazarEstadoPiso = udf((valor: Int) => valor match {
  case 1 => "Bueno"
  case 2 => "Regular"
  case 3 => "Malo"
 
})

val reemplazarParedMaterial = udf((valor: Int) => valor match {
  case 1 => "Hormigón/Ladrillo o Bloque"
  case 2 => "Asbesto/Cemento (Fibrolit)"
  case 3 => "Adobe o Tapia"
  case 4 => "Madera"
  case 5 => "Caña revestida o bahareque"
  case 6 => "Caña no revestida o estera"
  case 7 => "Otra Material"

})

val reemplazarEstadoPared = udf((valor: Int) => valor match {
  case 1 => "Bueno"
  case 2 => "Regular"
  case 3 => "Malo"

})


// Definir una función para realizar el reemplazo de valores en la columna "tipo_servicio_higienico"
val reemplazarTipoServicioHigienico = udf((valor: Int) => valor match {
  case 1 => "Excusado y alcantarillado"
  case 2 => "Excusado y pozo séptico"
  case 3 => "Excusado y pozo ciego"
  case 4 => "Letrina"
  case 5 => "No tiene"
 
})

// Definir una función para realizar el reemplazo de valores en la columna "alternativa_no_higienico"
val reemplazarAlternativaNoHigienico = udf((valor: Int) => valor match {
  case 1 => "Descarga directa al mar, río, lago o quebrada"
  case 2 => "Van al monte, campo, bota la basura en paquete"
  case 3 => "Usan una instalación sanitaria cercana y/o prestada"
 
})


// Definir una función para realizar el reemplazo de valores en la columna "tipo_instalacion_sanitaria"
val reemplazarTipoInstalacionSanitaria = udf((valor: Int) => valor match {
  case 1 => "Excusado y alcantarillado"
  case 2 => "Excusado y pozo séptico"
  case 3 => "Excusado y pozo ciego"
  case 4 => "Letrina"
 
})

// Definir una función para realizar el reemplazo de valores en la columna "tenencia_vivienda"
val reemplazarTenenciaVivienda = udf((valor: Int) => valor match {
  case 1 => "En arriendo"
  case 2 => "Anticresis y/o arriendo"
  case 3 => "Propia y la está pagando"
  case 4 => "Propia y totalmente pagada"
  case 5 => "Cedida"
  case 6 => "Recibida por servicios"
  case 7 => "Otra"
  
})

// Definir una función para realizar el reemplazo de valores en la columna "tipo_tuberia"
val reemplazarTipoTuberia = udf((valor: Int) => valor match {
  case 1 => "Por tubería dentro de la vivienda"
  case 2 => "Por tubería fuera de la vivienda pero en el lote"
  case 3 => "Por tubería fuera de la vivienda, lote o terreno"
  case 4 => "No recibe agua por tubería sino por otros medios"
 
})
val reemplazarParentescoPropietario = udf((valor: Int) => valor match {
  case 1 => "Si"
  case 2 => "NO"
})


In [25]:

// Crear un nuevo dataFrame con las columnas que han sido cambiadas
val data_with_etiquetas = dfCualitativo
  .withColumn("area", reemplazarArea(col("area")))
  .withColumn("tipo_vivienda", reemplazarTipoVivienda(col("tipo_vivienda")).cast(StringType))
  .withColumn("techo_material", reemplazarTechoMaterial(col("techo_material")).cast(StringType))
  .withColumn("estado_techo", reemplazarEstadoTecho(col("estado_techo")).cast(StringType))
  .withColumn("piso_material", reemplazarPisoMaterial(col("piso_material")).cast(StringType))
  .withColumn("estado_piso", reemplazarEstadoPiso(col("estado_piso")).cast(StringType))
  .withColumn("pared_material", reemplazarParedMaterial(col("pared_material")).cast(StringType))
  .withColumn("estado_pared", reemplazarEstadoPared(col("estado_pared")).cast(StringType))
   .withColumn("tipo_servicio_higienico", reemplazarTipoServicioHigienico(col("tipo_servicio_higienico")).cast(StringType))
  .withColumn("alternativa_no_higienico", reemplazarAlternativaNoHigienico(col("alternativa_no_higienico")).cast(StringType))
  .withColumn("tipo_instalacion_sanitaria", reemplazarTipoInstalacionSanitaria(col("tipo_instalacion_sanitaria")).cast(StringType))
  .withColumn("tenencia_vivienda", reemplazarTenenciaVivienda(col("tenencia_vivienda")).cast(StringType))
  .withColumn("tipo_tuberia", reemplazarTipoTuberia(col("tipo_tuberia")).cast(StringType))
 .withColumn("parentesco_propietario ",reemplazarParentescoPropietario(col("parentesco_propietario")).cast(StringType))


In [26]:
data_with_etiquetas.printSchema

In [27]:
data_with_etiquetas.groupBy("tipo_instalacion_sanitaria").count().sort("tipo_instalacion_sanitaria").show()
data_with_etiquetas.groupBy("tenencia_vivienda").count().sort("tenencia_vivienda").show()
data_with_etiquetas.groupBy("tipo_tuberia").count().sort("tipo_tuberia").show()
data_with_etiquetas.groupBy("tipo_servicio_higienico").count().sort("tipo_servicio_higienico").show()
data_with_etiquetas.groupBy("alternativa_no_higienico").count().sort("alternativa_no_higienico").show()
data_with_etiquetas.groupBy("estado_techo").count().sort("estado_techo").show()
data_with_etiquetas.groupBy("piso_material").count().sort("piso_material").show()
data_with_etiquetas.groupBy("estado_piso").count().sort("estado_piso").show()
data_with_etiquetas.groupBy("pared_material").count().sort("pared_material").show()
data_with_etiquetas.groupBy("estado_pared").count().sort("estado_pared").show()

In [28]:
val viviendasPorTipo = data_with_etiquetas.groupBy("tipo_vivienda").count().sort(desc("count"))


// Mostrar la cantidad de viviendas por tipo de vivienda
viviendasPorTipo.show()

z.show(viviendasPorTipo)

In [29]:
val dfFiltradoTipoVivienda = data_with_etiquetas.filter($"tipo_vivienda" === "Departamento")

// Filtrar por estado del techo "Regular"
val dfFiltradoEstadoTecho = data_with_etiquetas.filter($"estado_techo" === "Regular")

// Filtrar por tipo de vivienda "Departamento" y estado del techo "Regular"
val dfFiltrado = data_with_etiquetas.filter($"tipo_vivienda" === "Departamento" && $"estado_techo" === "Regular")

dfFiltrado.count()

z.show(dfFiltrado)

In [30]:
import org.apache.spark.sql.functions

// Convertir la columna "valor_arriendo" a tipo Double
val dfConValorArriendo = data_with_etiquetas.withColumn("valor_arriendo", $"valor_arriendo".cast("Double"))

// Calcular el promedio del valor arriendo por tipo de vivienda
val promedioArriendoPorTipo = dfConValorArriendo.groupBy("tipo_vivienda")
  .agg(functions.avg($"valor_arriendo").alias("promedio_arriendo"))
  .sort(desc("promedio_arriendo"))

// Mostrar el resultado
promedioArriendoPorTipo.show()



In [31]:
val viviendasPorTipo = data_with_etiquetas.filter($"area" === "Urbana")
  .groupBy("tipo_vivienda")
  .count()
  .sort(desc("count"))
z.show(viviendasPorTipo)

## Gráficas realizadas con Conexión a la Base de Datos

In [33]:
%mySQL(saveAs=consulAngular01)

SELECT subq.nombre_provincia, subq.tipo_vivienda, subq.promedio_arriendo
FROM (
    SELECT p.nombre_provincia, v.tipo_vivienda, AVG(h.valor_arriendo) AS promedio_arriendo, p.tasa_desempleo
    FROM provincia p
    JOIN canton c ON p.cod_provincia = c.provincia_canton
    JOIN parroquia pa ON c.cod_canton = pa.cod_canton
    JOIN vivienda v ON pa.cod_parroquia = v.cod_parroquia
    JOIN hogar h ON v.id_vivienda = h.id_vivienda
    GROUP BY p.nombre_provincia, v.tipo_vivienda, p.tasa_desempleo
) AS subq
WHERE subq.tasa_desempleo < (SELECT AVG(tasa_desempleo) FROM provincia)
ORDER BY subq.nombre_provincia DESC;


In [34]:
val dfFromMysqlAng = z.get("consulAngular01")

val arr = dfFromMysqlAng.toString.split("\n")
val colsName = arr(0).split("\t")
val dataArr = arr.drop(1)
val data = dataArr.map(row=> row.split("\t")).map(row=> (row(0), row(1), row(2).toDouble))

val dfAux = spark
 .createDataFrame(data)
 .toDF(colsName(0), colsName(1), colsName(2))

In [35]:
val auxDF = dfAux.groupBy("nombre_provincia").pivot("tipo_vivienda").avg("promedio_arriendo").orderBy("nombre_provincia").na.fill(0)
auxDF.show

In [36]:
val json = auxDF.toJSON.map(row => row.mkString).collectAsList

z.angularBind("dataAsJSON", json)

In [37]:
%angular
<input type="text" id="data4Spark" value={{dataAsJSON}}>

In [38]:

%angular
<script>
var cleanText = $('#data4Spark').val().replaceAll("\\", "").replaceAll("\"{", "{").replaceAll("}\"",
"}");
console.log(cleanText);
$('#data4Spark').val(cleanText);
</script>

In [39]:
%angular
<style>
#chartdiv {
    width: 100%;
    height: 500px;
}
</style>

<!-- Resources -->
<script src="https://cdn.amcharts.com/lib/5/index.js"></script>
<script src="https://cdn.amcharts.com/lib/5/xy.js"></script>
<script src="https://cdn.amcharts.com/lib/5/themes/Animated.js"></script>

<!-- Chart code -->
<script>

// Leer los datos desde el input (No necesita cambio) y transformarlos a JSON
var data = JSON.parse($('#data4Spark').val());
console.log(data);
am5.ready(function() {
// Create root element
// https://www.amcharts.com/docs/v5/getting-started/#Root_element
var root = am5.Root.new("chartdiv");
// Set themes
// https://www.amcharts.com/docs/v5/concepts/themes/
root.setThemes([
am5themes_Animated.new(root)
]);
// Create chart
// https://www.amcharts.com/docs/v5/charts/xy-chart/
var chart = root.container.children.push(am5xy.XYChart.new(root, {
panX: false,
panY: false,
wheelX: "panX",
wheelY: "zoomX",
layout: root.verticalLayout
}));
// Add legend
// https://www.amcharts.com/docs/v5/charts/xy-chart/legend-xy-series/
var legend = chart.children.push(
am5.Legend.new(root, {
centerX: am5.p50,
x: am5.p50,
position: 'top'
})
);
// Create axes
// https://www.amcharts.com/docs/v5/charts/xy-chart/axes/
var xRenderer = am5xy.AxisRendererX.new(root, {
cellStartLocation: 0.1,
cellEndLocation: 0.9
})
var xAxis = chart.xAxes.push(am5xy.CategoryAxis.new(root, {
categoryField: "nombre_provincia", //Cambiar por su columna de agrupamiento
renderer: xRenderer,
tooltip: am5.Tooltip.new(root, {})
}));
xRenderer.grid.template.setAll({
location: 1
})
xAxis.data.setAll(data);
var yAxis = chart.yAxes.push(am5xy.ValueAxis.new(root, {
renderer: am5xy.AxisRendererY.new(root, {
strokeOpacity: 0.1
})
}));
// Add series
// https://www.amcharts.com/docs/v5/charts/xy-chart/series/
function makeSeries(name, fieldName) {
var series = chart.series.push(am5xy.ColumnSeries.new(root, {
name: name,
xAxis: xAxis,
yAxis: yAxis,
valueYField: fieldName,
categoryXField: "nombre_provincia" //Cambiar por su columna de agrupamiento
}));
series.columns.template.setAll({
tooltipText: "{name}, {categoryX}:{valueY.formatNumber('###.##')}",
width: am5.percent(90),
tooltipY: 0,
strokeOpacity: 0
});
series.data.setAll(data);
// Make stuff animate on load
// https://www.amcharts.com/docs/v5/concepts/animations/
series.appear();
series.bullets.push(function() {
return am5.Bullet.new(root, {
locationY: 0,
sprite: am5.Label.new(root, {
text: "{valueY}",
fill: root.interfaceColors.get("alternativeText"),
centerY: 0,
centerX: am5.p50,
populateText: true
})
});
});
legend.data.push(series);
}
//Necesita adaptar para el resto de sus columnas, una llamada a makeSeries por cada columna
//En makeSeries, el primer parámetro es el valor a mostrar y el segundo es el nombre de
//la columna o propiedad JSON
makeSeries("Casa o Villa", "Casa o Villa");
makeSeries("Choz", "Choza");
makeSeries("Cuartos en casa de inquilinato", "Cuartos en casa de inquilinato");
makeSeries("Departamento", "Departamento");
makeSeries("Mediagua", "Mediagua");
makeSeries("Rancho", "Rancho, covacha");
// Make stuff animate on load
//https://www.amcharts.com/docs/v5/concepts/animations/
chart.appear(1000, 100);
}); // end am5.ready()
</script>
<!-- HTML -->
<div id="chartdiv"></div>

 

### Consulta Cristian Rodriguez

In [41]:
%mySQL
SELECT
    v.tipo_vivienda,
    h.material_piso,
    COUNT(v.id_vivienda) AS nros_vivienda
FROM
    vivienda v
JOIN
    hogar h ON v.id_vivienda = h.id_vivienda
GROUP BY
    v.tipo_vivienda, h.material_piso;

In [42]:
%mySQL
SELECT
    v.tipo_vivienda,
    h.material_paredes,
    COUNT(v.id_vivienda) AS nros_vivienda
FROM
    vivienda v
JOIN
    hogar h ON v.id_vivienda = h.id_vivienda
GROUP BY
    v.tipo_vivienda, h.material_paredes;

### Proporción del estado del hogar según el material

Estado de paredes, piso y techo según su material

In [44]:
val query = """
    (SELECT estado_techo, estado_piso, estado_paredes, material_piso, material_techo, material_paredes
     FROM hogar) as qryData
    """
    
val tempDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306/mydb_integrador") //"jdbc:mysql://hostname:port/dbname"
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "root")
    .option("password", "")
    .option("dbtable", query)
    .load()


In [45]:
z.show(tempDF.limit(5))

In [46]:
val df_techos= tempDF.groupBy("estado_techo", "material_techo").count()

In [47]:
z.show(df_techos)

In [48]:
///DF para Techos
val df_techos= tempDF.groupBy("estado_techo", 
                              "material_techo").count()
                              
//Df para Paredes
val df_paredes= tempDF.groupBy("estado_paredes", 
                              "material_paredes").count()
                              
//DF para Piso
val df_piso= tempDF.groupBy("estado_piso", 
                              "material_piso").count()


In [49]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df_groupedwTotal = df_techos.withColumn(
    "total", 
    sum("count").over(Window.partitionBy("material_techo")))

val df_groupedwProportion = df_groupedwTotal.withColumn(
    "proportion", 
    col("count") / col("total"))


val df_groupedwTotalParedes = df_paredes.withColumn(
    "total", 
    sum("count").over(Window.partitionBy("material_paredes")))

val df_groupedwProportionParedes = df_groupedwTotalParedes.withColumn(
    "proportion", 
    col("count") / col("total"))
    

val df_groupedwTotalPiso = df_piso.withColumn(
    "total", 
    sum("count").over(Window.partitionBy("material_piso")))

val df_groupedwProportionPiso = df_groupedwTotalPiso.withColumn(
    "proportion", 
    col("count") / col("total"))


In [50]:
z.show(df_groupedwProportion)

In [51]:
z.put("df_grouped_correlation", df_groupedwProportion)
z.put("df_grouped_paredes", df_groupedwProportionParedes)
z.put("df_grouped_piso", df_groupedwProportionPiso)


In [52]:
%pyspark
from pyspark.sql import DataFrame
import pandas as pd

dftecho = DataFrame(z.get("df_grouped_correlation"), sqlContext)
pdftecho = dftecho.toPandas()

dfpared = DataFrame(z.get("df_grouped_paredes"), sqlContext)
pdfpared = dfpared.toPandas()

dfpiso = DataFrame(z.get("df_grouped_piso"), sqlContext)
pdfpiso = dfpiso.toPandas()



In [53]:
%pyspark
import numpy as np
import matplotlib.pyplot as plt

# Crear un gráfico de barras agrupadas con tamaño más grande
fig, ax = plt.subplots(figsize=(12, 8))  # Ajustar el tamaño aquí

# Lista de estados del techo y materiales únicos
estados_techo = pdftecho['estado_techo'].unique()
materiales_techo = pdftecho['material_techo'].unique()

# Diccionario para asignar colores a cada estado del techo
colores_dict = {
    'Malo': 'r',      # Rojo
    'Regular': 'c',   # Amarillo
    'Bueno': 'g'      # Verde
}

# Ancho de las barras agrupadas
bar_width = 0.2

# Posiciones para cada grupo de barras
positions = np.arange(len(materiales_techo))

# Crear las barras agrupadas y agregar etiquetas
for i, estado_techo in enumerate(estados_techo):
    estado_color = colores_dict.get(estado_techo, 'k')
    data_estado = pdftecho[pdftecho['estado_techo'] == estado_techo]['proportion']
    ax.bar(
        positions + i * bar_width, 
        data_estado, 
        bar_width, 
        label=estado_techo, 
        color=estado_color
    )

    # Agregar etiquetas a las barras
    for j in range(len(materiales_techo)):
        ax.text(
            positions[j] + i * bar_width, 
            data_estado.iloc[j] + 0.01, 
            f"{data_estado.iloc[j]:.2f}", 
            ha='center'
        )

# Añadir la leyenda, etiquetas de eje y título
ax.legend(title='Estado del techo')
plt.xlabel('Material del techo')
plt.ylabel('Proporción')
plt.title('Proporción de estado del techo por material')

# Posiciones del eje x y etiquetas para cada barra agrupada
ax.set_xticks(positions + (bar_width * len(estados_techo) / 2))
ax.set_xticklabels(materiales_techo)

# Rotar las etiquetas del eje x
plt.xticks(rotation=-45)

# Mostrar el gráfico
plt.tight_layout()
plt.show()


In [54]:
%pyspark
import numpy as np
import matplotlib.pyplot as plt

# Crear un gráfico de barras agrupadas con tamaño más grande
fig, ax = plt.subplots(figsize=(12, 8))  # Ajustar el tamaño aquí

# Lista de estados del techo y materiales únicos
estados_pared = pdfpared['estado_paredes'].unique()
materiales_pared = pdfpared['material_paredes'].unique()

# Diccionario para asignar colores a cada estado del techo
colores_dict = {
    'Malo': 'r',      # Rojo
    'Regular': 'c',   # Amarillo
    'Bueno': 'g'      # Verde
}

# Ancho de las barras agrupadas
bar_width = 0.2

# Posiciones para cada grupo de barras
positions = np.arange(len(materiales_pared))

# Crear las barras agrupadas y agregar etiquetas
for i, estado_pared in enumerate(estados_pared):
    estado_color = colores_dict.get(estado_pared, 'k')
    data_estado = pdfpared[pdfpared['estado_paredes'] == estado_pared]['proportion']
    ax.bar(
        positions + i * bar_width, 
        data_estado, 
        bar_width, 
        label=estado_pared, 
        color=estado_color
    )

    # Agregar etiquetas a las barras
    for j in range(len(materiales_pared)):
        ax.text(
            positions[j] + i * bar_width, 
            data_estado.iloc[j] + 0.01, 
            f"{data_estado.iloc[j]:.2f}", 
            ha='center'
        )

# Añadir la leyenda, etiquetas de eje y título
ax.legend(title='Estado de las paredes')
plt.xlabel('Material de las paredes')
plt.ylabel('Proporción')
plt.title('Proporción de estado de las paredes por material')

# Posiciones del eje x y etiquetas para cada barra agrupada
ax.set_xticks(positions + (bar_width * len(estados_techo) / 2))
ax.set_xticklabels(materiales_pared)

# Rotar las etiquetas del eje x
plt.xticks(rotation=-45)

# Mostrar el gráfico
plt.tight_layout()
plt.show()


In [55]:
%pyspark
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

# Lista de estados del techo y materiales únicos
estados_piso = pdfpiso['estado_piso'].unique()
materiales_piso = pdfpiso['material_piso'].unique()

# Crear un DataFrame con todas las combinaciones posibles de material_piso y estado_piso
all_combinations = pd.DataFrame([(m, e) for m in materiales_piso for e in estados_piso], columns=['material_piso', 'estado_piso'])

# Hacer un merge de los datos originales (pdfpiso) con todas las combinaciones posibles
merged_data = pd.merge(all_combinations, pdfpiso, how='left', on=['material_piso', 'estado_piso'])
merged_data.fillna(0, inplace=True)  # Rellenar los valores NaN con cero

# Crear un gráfico de barras agrupadas con tamaño más grande
fig, ax = plt.subplots(figsize=(12, 8))  # Ajustar el tamaño aquí

# Diccionario para asignar colores a cada estado del techo
colores_dict = {
    'Malo': 'r',      # Rojo
    'Regular': 'c',   # Amarillo
    'Bueno': 'g'      # Verde
}

# Ancho de las barras agrupadas
bar_width = 0.2

# Posiciones para cada grupo de barras
positions = np.arange(len(materiales_piso))

# Crear las barras agrupadas y agregar etiquetas
for i, estado_piso in enumerate(estados_piso):
    estado_color = colores_dict.get(estado_piso, 'k')
    data_estado = merged_data[merged_data['estado_piso'] == estado_piso]['proportion']
    ax.bar(
        positions + i * bar_width, 
        data_estado, 
        bar_width, 
        label=estado_piso, 
        color=estado_color
    )

    # Agregar etiquetas a las barras
    for j in range(len(materiales_piso)):
        ax.text(
            positions[j] + i * bar_width, 
            data_estado.iloc[j] + 0.01, 
            f"{data_estado.iloc[j]:.2f}", 
            ha='center'
        )

# Añadir la leyenda, etiquetas de eje y título
ax.legend(title='Estado del techo')
plt.xlabel('Material del techo')
plt.ylabel('Proporción')
plt.title('Proporción de estado del techo por material')

# Posiciones del eje x y etiquetas para cada barra agrupada
ax.set_xticks(positions + (bar_width * len(estados_piso) / 2))
ax.set_xticklabels(materiales_piso)

# Rotar las etiquetas del eje x
plt.xticks(rotation=-45)

# Mostrar el gráfico
plt.tight_layout()
plt.show()


## Grafico de dispersion
***tasa de desempleo - gasto de combustibles***
este grafico permite identificar comportamiento atipico entre las provincias en relacion son su tasa de desempleo promedio y la relacion que tienen con el gato de combustibel.
Se encontro que Tungurahua presentan un comportamiento atipo a las demas indicando que a pesar que la tasa de desempleo es alta su gato de combustible tambien lo es a diferencia de las otras.
Esto podria indicar que tiene potencial para proyectos de inversion orientado a combustibles, ademas se podria deducir que albergan industrias estrategicas para la economia.
Ademas se infiere que las provincias con un comportamiento inusual al resto podrian ser areas de mayor crecimiento economico.

In [57]:
%mySQL(saveAs= result2)

SELECT 
    p.nombre_provincia,
    AVG(p.tasa_desempleo) AS tasa_desempleo_promedio,
    SUM(hc.gasto_combustible) AS total_gasto_combustible
FROM
    provincia p
JOIN
    canton c ON p.cod_provincia = c.provincia_canton
JOIN
    parroquia pa ON c.cod_canton = pa.cod_canton
JOIN
    vivienda v ON pa.cod_parroquia = v.cod_parroquia
JOIN
    hogar h ON v.id_vivienda = h.id_vivienda
JOIN
    hogar_combustibles hc ON h.id_hogar = hc.id_hogar
WHERE p.nombre_provincia != ' Zonas No Delimitadas' 
GROUP BY
    p.nombre_provincia;



In [58]:
%pyspark
dfdemo2 = z.getAsDataFrame('result2')
type(dfdemo2)

In [59]:
%pyspark
import matplotlib.pyplot as plt

# Crear el gráfico de dispersión
plt.figure(figsize=(10, 6))
plt.scatter(dfdemo2['tasa_desempleo_promedio'], dfdemo2['total_gasto_combustible'])

# Etiquetas y título del gráfico
plt.xlabel("Tasa de Desempleo Promedio")
plt.ylabel("Gasto en Combustible")
plt.title("Relación entre Tasa de Desempleo y Gasto en Combustible por Provincia")

# Etiquetas para cada punto (provincia)
for i, row in dfdemo2.iterrows():
    plt.text(row['tasa_desempleo_promedio'], row['total_gasto_combustible'], row['nombre_provincia'])

# Mostrar el gráfico
plt.show()



## Porcentaje de viviendas con servicios básicos ideales 

In [61]:
%mySQL(saveAs=result5)
SELECT p.nombre_provincia, 
       COUNT(*) AS cantidad_hogares,
       COUNT(*) * 100.0 / total_hogares_provincia.total AS porcentaje
FROM provincia p
INNER JOIN canton c ON p.cod_provincia = c.provincia_canton
INNER JOIN parroquia pa ON c.cod_canton = pa.cod_canton
INNER JOIN vivienda v ON pa.cod_parroquia = v.cod_parroquia
INNER JOIN hogar h ON v.id_vivienda = h.id_vivienda
INNER JOIN servicios_basicos sb ON h.id_hogar = sb.id_hogar
INNER JOIN (
  SELECT p.cod_provincia, COUNT(*) AS total
  FROM provincia p
  INNER JOIN canton c ON p.cod_provincia = c.provincia_canton
  INNER JOIN parroquia pa ON c.cod_canton = pa.cod_canton
  INNER JOIN vivienda v ON pa.cod_parroquia = v.cod_parroquia
  INNER JOIN hogar h ON v.id_vivienda = h.id_vivienda
  INNER JOIN servicios_basicos sb ON h.id_hogar = sb.id_hogar
  GROUP BY p.cod_provincia
) AS total_hogares_provincia ON p.cod_provincia = total_hogares_provincia.cod_provincia
WHERE sb.agua_tuberia = 'Por tubería dentro de la vivienda' AND sb.servicio_higienico = 'Excusado y alcantarillado'
GROUP BY p.nombre_provincia, total_hogares_provincia.total;


In [62]:
val dfFromMysql5 = z.get("result5")


In [63]:
val arr = dfFromMysql5.toString.split("\n")
val colsName = arr(0).split("\t")
val dataArr = arr.drop(1)
val data = dataArr.map(row=> row.split("\t")).map(row=> (row(0), row(1), row(2).toDouble))

In [64]:
val dfAux = spark
 .createDataFrame(data)
 .toDF(colsName(0), colsName(1), colsName(2))

In [65]:
dfAux.show

In [66]:
val json = dfAux.toJSON.map(row => row.mkString).collectAsList

In [67]:
z.angularBind("dataAsJSON", json)

In [68]:
%angular
<input type="text" id="data4Spark" value={{dataAsJSON}}>

In [69]:
%angular
<script>
var cleanText = $('#data4Spark').val().replaceAll("\\", "").replaceAll("\"{", "{").replaceAll("}\"",
"}");
console.log(cleanText);
$('#data4Spark').val(cleanText);
</script>

In [70]:
%angular
<style>
#chartdiv2 {
    width: 100%;
    height: 500px;
}
</style>

<!-- Resources -->
<script src="https://cdn.amcharts.com/lib/5/index.js"></script>
<script src="https://cdn.amcharts.com/lib/5/xy.js"></script>
<script src="https://cdn.amcharts.com/lib/5/themes/Animated.js"></script>

<!-- Chart code -->
<script>

// Leer los datos desde el input (No necesita cambio) y transformarlos a JSON
var data = JSON.parse($('#data4Spark').val());
console.log(data);
am5.ready(function() {
// Create root element
// https://www.amcharts.com/docs/v5/getting-started/#Root_element
var root = am5.Root.new("chartdiv");
// Set themes
// https://www.amcharts.com/docs/v5/concepts/themes/
root.setThemes([
am5themes_Animated.new(root)
]);
// Create chart
// https://www.amcharts.com/docs/v5/charts/xy-chart/
var chart = root.container.children.push(am5xy.XYChart.new(root, {
panX: false,
panY: false,
wheelX: "panX",
wheelY: "zoomX",
layout: root.verticalLayout
}));
// Add legend
// https://www.amcharts.com/docs/v5/charts/xy-chart/legend-xy-series/
var legend = chart.children.push(
am5.Legend.new(root, {
centerX: am5.p50,
x: am5.p50,
position: 'top'
})
);
// Create axes
// https://www.amcharts.com/docs/v5/charts/xy-chart/axes/
var xRenderer = am5xy.AxisRendererX.new(root, {
cellStartLocation: 0.1,
cellEndLocation: 0.9
})
var xAxis = chart.xAxes.push(am5xy.CategoryAxis.new(root, {
categoryField: "nombre_provincia", //Cambiar por su columna de agrupamiento
renderer: xRenderer,
tooltip: am5.Tooltip.new(root, {})
}));
xRenderer.grid.template.setAll({
location: 1
})
xAxis.data.setAll(data);
var yAxis = chart.yAxes.push(am5xy.ValueAxis.new(root, {
renderer: am5xy.AxisRendererY.new(root, {
strokeOpacity: 0.1
})
}));
// Add series
// https://www.amcharts.com/docs/v5/charts/xy-chart/series/
function makeSeries(name, fieldName) {
var series = chart.series.push(am5xy.ColumnSeries.new(root, {
name: name,
xAxis: xAxis,
yAxis: yAxis,
valueYField: fieldName,
categoryXField: "nombre_provincia" //Cambiar por su columna de agrupamiento
}));
series.columns.template.setAll({
tooltipText: "{name}, {categoryX}:{valueY.formatNumber('###.##')}",
width: am5.percent(90),
tooltipY: 0,
strokeOpacity: 0
});
series.data.setAll(data);
// Make stuff animate on load
// https://www.amcharts.com/docs/v5/concepts/animations/
series.appear();
series.bullets.push(function() {
return am5.Bullet.new(root, {
locationY: 0,
sprite: am5.Label.new(root, {
text: "{valueY}",
fill: root.interfaceColors.get("alternativeText"),
centerY: 0,
centerX: am5.p50,
populateText: true
})
});
});
legend.data.push(series);
}
//Necesita adaptar para el resto de sus columnas, una llamada a makeSeries por cada columna
//En makeSeries, el primer parámetro es el valor a mostrar y el segundo es el nombre de
//la columna o propiedad JSON
//makeSeries("cantidad_hogares", "cantidad_hogares");
makeSeries("porcentaje", "porcentaje");
// Make stuff animate on load
//https://www.amcharts.com/docs/v5/concepts/animations/
chart.appear(1000, 100);
}); // end am5.ready()
</script>
<!-- HTML -->
<div id="chartdiv"></div>
