## Cálculo de Outliers de la variable Factor de Expansión

In [1]:
import org.apache.spark.sql.types._
val myDataSchema = StructType(
    Array(
        StructField("id_persona", DecimalType(26, 0), true), 
        StructField("anio", IntegerType, true), 
        StructField("mes", IntegerType, true), 
        StructField("provincia", IntegerType, true), 
        StructField("canton", IntegerType, true), 
        StructField("area", StringType, true), 
        StructField("genero", StringType, true), 
        StructField("edad", IntegerType, true), 
        StructField("estado_civil", StringType, true), 
        StructField("nivel_de_instruccion", StringType, true), 
        StructField("etnia", StringType, true), 
        StructField("ingreso_laboral", IntegerType, true), 
        StructField("condicion_actividad", StringType, true), 
        StructField("sectorizacion", StringType, true), 
        StructField("grupo_ocupacion", StringType, true), 
        StructField("rama_actividad", StringType, true), 
        StructField("factor_expansion", DoubleType, true)
    ));
val df = spark.read.schema(myDataSchema).option("header","true").option("delimiter","\t").csv("/home/david/Datos_ENEMDU_PEA_v2.csv")

In [2]:
val dfFactExpasion= df.drop("id_persona").select($"factor_expansion").filter("factor_expansion is NOT NULL")
val cuartiles = dfFactExpasion.stat.approxQuantile("factor_expansion",Array(0.25,0.75),0.00)
val dest= dfFactExpasion.select(stddev("factor_expansion")).first()(0).asInstanceOf[Double]
val avg= dfFactExpasion.select(mean("factor_expansion")).first()(0).asInstanceOf[Double]

In [3]:
val minimo = avg - 3 * dest
val maximo = avg + 3 * dest

In [4]:
val valoresMenoresli  = dfFactExpasion
.where($"factor_expansion" < minimo)
valoresMenoresli.describe().show

In [5]:
val valoresMayoresls  = dfFactExpasion
.where($"factor_expansion" >  maximo)
valoresMayoresls.describe().show

In [6]:
val inferioriqr  = q1 - 1.5 *iqr
val superioriqr  = q3 + 1.5*iqr

In [7]:
val dfSinOutliers = dfFactExpasion.where($"factor_expansion" > inferioriqr  && $"factor_expansion" < superioriqr).withColumn("factor_expansion", format_number($"factor_expansion", 3))
dfSinOutliers.summary().show

In [8]:
val q1 = cuartiles(0)
val q3 =  cuartiles(1)
val iqr = q3 - q1

In [9]:
val rangos = dfSinOutliers.withColumn("range", when($"factor_expansion" >  0 and $"factor_expansion" <= q1, lit("0 a  q1"))
                .otherwise(when($"factor_expansion" > q1 and $"factor_expansion" <= q3-q1, lit(" q1 a q2"))
                .otherwise(when($"factor_expansion" > q3 - q1 and $"factor_expansion" <= q3, lit("q2 a q3"))
                .otherwise(when($"factor_expansion" > q3 and $"factor_expansion" <= superioriqr, lit("q3 a limite iqr"))
                ) ) ) 

         ).groupBy("range").count().orderBy("range").withColumnRenamed("count","Cantidad")
rangos.show

In [10]:
val total = dfSinOutliers.count
val porcentaje = rangos.select($"range",$"Cantidad"/total *100).withColumnRenamed("((Cantidad / 590758) * 100)","porcentaje")
val resultado = rangos.join(porcentaje,Seq("range")).withColumn("porcentaje", format_number($"porcentaje", 3)).sort(desc("porcentaje"))
resultado.show


In [11]:
resultado.select(sum("porcentaje")).show
resultado.select(sum("Cantidad")).show