In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q pyspark
!pip install -q findspark

[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[K     |████████████████████████████████| 198 kB 60.0 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import os # libreria de manejo del sistema operativo
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"
os.environ["PYTHON_PATH"] = "/content/spark-2.4.8-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip"

In [3]:
# findspark permite usar pyspark (interfaz de Python a Spark),
# desde cualquier programa escrito en Python.
import findspark
findspark.init()
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark  import SparkContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc=SparkContext.getOrCreate()
sc

In [4]:
archivo = './sample_data/Persona.csv'


df= (spark.read
     .format("csv")
     .option('header','true')
     .option('inferSchema','true')
     .option('sep',';')
     .load(archivo))


In [5]:
#Fase 1
#1.- ver el df , primero 20 registros
print("ver 20 primeros datos: \n")

df.show(20)

#2.- Cantidad de registros
print("la cantidad de registros es: \n",df.count())


#3.- Esquema
print("el esquema es: \n")
df.dtypes



ver 20 primeros datos: 

+----+----------------+-------+-------------+--------------+-----------------+---------+------------------+------+--------+--------+----------+-------------+--------+-------+
|Edad|    TipoEduccion|Ahorros|Nivel_escolar|anos_educacion|        profesion| relacion|              raza|  sexo|ganancia|perdidas|horas_trab|         pais|ingresos| clases|
+----+----------------+-------+-------------+--------------+-----------------+---------+------------------+------+--------+--------+----------+-------------+--------+-------+
|  50|Self-emp-not-inc|  83311|    Bachelors|            13|  Exec-managerial|  Husband|             White|  Male|       0|       0|        13|United-States|   <=50K|casados|
|  53|         Private| 234721|         11th|             7|Handlers-cleaners|  Husband|             Black|  Male|       0|       0|        40|United-States|   <=50K|casados|
|  28|         Private| 338409|    Bachelors|            13|   Prof-specialty|     Wife|            

[('Edad', 'string'),
 ('TipoEduccion', 'string'),
 ('Ahorros', 'string'),
 ('Nivel_escolar', 'string'),
 ('anos_educacion', 'string'),
 ('profesion', 'string'),
 ('relacion', 'string'),
 ('raza', 'string'),
 ('sexo', 'string'),
 ('ganancia', 'string'),
 ('perdidas', 'string'),
 ('horas_trab', 'string'),
 ('pais', 'string'),
 ('ingresos', 'string'),
 ('clases', 'string')]

In [6]:
from pyspark.sql.functions import isnull,isnan,when,count,col
import numpy as np
from pyspark.sql.functions import when


#5.- conversion de datos (cast)
df = df.select(col('Edad').cast('int'),
                         col('TipoEduccion'),
                         col('Ahorros').cast('int'),
                         col('Nivel_escolar'),
                         col('anos_educacion').cast('float'),
                         col('profesion'),
                         col('relacion'),
                         col('raza'),
                         col('sexo'),
                         col('ganancia').cast('float'),
                         col('perdidas').cast('float'),
                         col('horas_trab').cast('float'),
                         col('pais'),
                         col('ingresos'),
                         col('clases')
                        )


#asignamos valores 0 a nulos en campos numericos

#4.- Maximo,minimo.media y cantidad
print("Maximo,minimo, media y cantidad de columnas numericas: \n")
df.describe().show()


#Fase de preparacion

#1 mostrar cantidad de valores null y nan de las columnas
print("Cantidad de valores null y nan de todas las columnas : \n")
df.select([count(when(isnan(c)|isnull(c), c)).alias(c) for c in df.columns]).show()

#2.- remplazo edad por nan
df =  df.withColumn("Edad",when(df.Edad==0,np.nan).otherwise(df.Edad))

#3.- remplazo nivel escolar nulo por bachelors
df = df.fillna("Bachelors", ["Nivel_escolar"])

#4. De la columna raza eliminar todas las filas nulas de la columna raza. [5 ptos]
df = df.dropna(subset = "raza")


Maximo,minimo, media y cantidad de columnas numericas: 

+-------+------------------+------------+------------------+-------------+-----------------+------------+--------+------------------+------+------------------+------------------+-----------------+--------+--------+--------+
|summary|              Edad|TipoEduccion|           Ahorros|Nivel_escolar|   anos_educacion|   profesion|relacion|              raza|  sexo|          ganancia|          perdidas|       horas_trab|    pais|ingresos|  clases|
+-------+------------------+------------+------------------+-------------+-----------------+------------+--------+------------------+------+------------------+------------------+-----------------+--------+--------+--------+
|  count|             20379|       20382|             20346|        20381|            20379|       20382|   20382|             20382| 20382|             20367|             20363|            20381|   20382|   20382|   20382|
|   mean| 42.94293144904068|        null| 18776

In [7]:

#5.- Asignar a los valores null de las columnas ganancia, perdidas, horas_trab, edad, años de educación la media, mediana o moda según sea la mejor opción. [10 ptos]
from pyspark.ml.feature import Imputer

imputer=Imputer(inputCols=["ganancia","perdidas","horas_trab","Edad","anos_educacion"],
                outputCols=["ganancia","perdidas","horas_trab","Edad","anos_educacion"])
model = imputer.fit(df)
df=model.transform(df)
df.show()



+-----------------+----------------+-------+-------------+--------------+-----------------+---------+------------------+------+--------+--------+----------+-------------+--------+-------+
|             Edad|    TipoEduccion|Ahorros|Nivel_escolar|anos_educacion|        profesion| relacion|              raza|  sexo|ganancia|perdidas|horas_trab|         pais|ingresos| clases|
+-----------------+----------------+-------+-------------+--------------+-----------------+---------+------------------+------+--------+--------+----------+-------------+--------+-------+
|             50.0|Self-emp-not-inc|  83311|    Bachelors|          13.0|  Exec-managerial|  Husband|             White|  Male|     0.0|     0.0|      13.0|United-States|   <=50K|casados|
|             53.0|         Private| 234721|         11th|           7.0|Handlers-cleaners|  Husband|             Black|  Male|     0.0|     0.0|      40.0|United-States|   <=50K|casados|
|             28.0|         Private| 338409|    Bachelors|  

In [8]:
#6. Crear columna calculada llamada rango_etario que permita clasificar a las personas según su edad.

#FORMA2
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *
from pyspark.sql.functions import udf

df = df.selectExpr("*", ("case when Edad < 26  then 'Joven'"+
                                "when Edad < 35 then 'Adulto Joven'"+
                                "when Edad < 35 then 'Adulto Joven'"+
                                "when Edad < 60 then 'Adulto'"+
                                "else 'Adulto Mayor' end as rango_etario"))
df.show()


+-----------------+----------------+-------+-------------+--------------+-----------------+---------+------------------+------+--------+--------+----------+-------------+--------+-------+------------+
|             Edad|    TipoEduccion|Ahorros|Nivel_escolar|anos_educacion|        profesion| relacion|              raza|  sexo|ganancia|perdidas|horas_trab|         pais|ingresos| clases|rango_etario|
+-----------------+----------------+-------+-------------+--------------+-----------------+---------+------------------+------+--------+--------+----------+-------------+--------+-------+------------+
|             50.0|Self-emp-not-inc|  83311|    Bachelors|          13.0|  Exec-managerial|  Husband|             White|  Male|     0.0|     0.0|      13.0|United-States|   <=50K|casados|      Adulto|
|             53.0|         Private| 234721|         11th|           7.0|Handlers-cleaners|  Husband|             Black|  Male|     0.0|     0.0|      40.0|United-States|   <=50K|casados|      Adu

In [9]:
#7.- Eliminar las columnas Edad, TipoEducción y Profesión que cuentan con muchos valores null. [5 ptos]
df = df.dropna(subset = "Edad")
df = df.dropna(subset = "TipoEduccion")
df = df.dropna(subset = "profesion")

df.show()

+-----------------+----------------+-------+-------------+--------------+-----------------+---------+------------------+------+--------+--------+----------+-------------+--------+-------+------------+
|             Edad|    TipoEduccion|Ahorros|Nivel_escolar|anos_educacion|        profesion| relacion|              raza|  sexo|ganancia|perdidas|horas_trab|         pais|ingresos| clases|rango_etario|
+-----------------+----------------+-------+-------------+--------------+-----------------+---------+------------------+------+--------+--------+----------+-------------+--------+-------+------------+
|             50.0|Self-emp-not-inc|  83311|    Bachelors|          13.0|  Exec-managerial|  Husband|             White|  Male|     0.0|     0.0|      13.0|United-States|   <=50K|casados|      Adulto|
|             53.0|         Private| 234721|         11th|           7.0|Handlers-cleaners|  Husband|             Black|  Male|     0.0|     0.0|      40.0|United-States|   <=50K|casados|      Adu

In [10]:
#8. Eliminar las columnas ganancia y perdida que cuenta con muchos valores 0.
import numpy as np
df =  df.withColumn("ganancia",when(df.ganancia==0,np.nan).otherwise(df.ganancia))
df =  df.withColumn("perdidas",when(df.perdidas==0,np.nan).otherwise(df.perdidas))
df = df.dropna(subset = "ganancia")
df = df.dropna(subset = "perdidas")

df.show()

+----+------------+-------+-------------+--------------+-----------------+-------------+-----+------+---------------+---------------+----------+-------------+--------+--------+------------+
|Edad|TipoEduccion|Ahorros|Nivel_escolar|anos_educacion|        profesion|     relacion| raza|  sexo|       ganancia|       perdidas|horas_trab|         pais|ingresos|  clases|rango_etario|
+----+------------+-------+-------------+--------------+-----------------+-------------+-----+------+---------------+---------------+----------+-------------+--------+--------+------------+
|26.0|     Private| 256000|  Prof-school|          15.0|   Prof-specialty|      Husband|White|  Male|        99999.0|22861.240234375|      60.0|United-States|    >50K| casados|Adulto Joven|
|36.0|     Private| 183892| Some-college|          10.0|     Craft-repair|      Husband|White|  Male|         7298.0|22861.240234375|      44.0|United-States|    >50K| casados|      Adulto|
|37.0|     Private| 758700|          9th|         

In [11]:
#9. Convertir las variables categóricas del dataframe a numéricas: las variables a convertir son nivel_escolar, relación, raza, sexo, país, ingresos, rango_etario, clases.[
from pyspark.ml.feature import StringIndexer

df= StringIndexer(
    inputCol='Nivel_escolar',
    outputCol='Nivel_escolar_A',
    handleInvalid='keep').fit(df).transform(df)

df= StringIndexer(
    inputCol='relacion',
    outputCol='relacion_A',
    handleInvalid='keep').fit(df).transform(df)

df= StringIndexer(
    inputCol='raza',
    outputCol='raza_A',
    handleInvalid='keep').fit(df).transform(df)

df= StringIndexer(
    inputCol='sexo',
    outputCol='sexo_A',
    handleInvalid='keep').fit(df).transform(df)

df= StringIndexer(
    inputCol='pais',
    outputCol='pais_A',
    handleInvalid='keep').fit(df).transform(df)


df= StringIndexer(
    inputCol='ingresos',
    outputCol='ingresos_A',
    handleInvalid='keep').fit(df).transform(df)


df= StringIndexer(
    inputCol='rango_etario',
    outputCol='rango_etario_A',
    handleInvalid='keep').fit(df).transform(df)

df= StringIndexer(
    inputCol='clases',
    outputCol='clases_A',
    handleInvalid='keep').fit(df).transform(df)


df.show()

+----+------------+-------+-------------+--------------+-----------------+-------------+-----+------+---------------+---------------+----------+-------------+--------+--------+------------+---------------+----------+------+------+------+----------+--------------+--------+
|Edad|TipoEduccion|Ahorros|Nivel_escolar|anos_educacion|        profesion|     relacion| raza|  sexo|       ganancia|       perdidas|horas_trab|         pais|ingresos|  clases|rango_etario|Nivel_escolar_A|relacion_A|raza_A|sexo_A|pais_A|ingresos_A|rango_etario_A|clases_A|
+----+------------+-------+-------------+--------------+-----------------+-------------+-----+------+---------------+---------------+----------+-------------+--------+--------+------------+---------------+----------+------+------+------+----------+--------------+--------+
|26.0|     Private| 256000|  Prof-school|          15.0|   Prof-specialty|      Husband|White|  Male|        99999.0|22861.240234375|      60.0|United-States|    >50K| casados|Adult

In [12]:
#10. Eliminar las columnas de tipo categórico nivel_escolar, relación, raza, sexo, país, rango_edad,clases, dejando solo las nuevas variables numéricas creadas.

df=df.drop('Nivel_escolar')
df=df.drop('raza')
df=df.drop('sexo')
df=df.drop('pais')
df=df.drop('rango_etario')
df=df.drop('clases')
df.show()




+----+------------+-------+--------------+-----------------+-------------+---------------+---------------+----------+--------+---------------+----------+------+------+------+----------+--------------+--------+
|Edad|TipoEduccion|Ahorros|anos_educacion|        profesion|     relacion|       ganancia|       perdidas|horas_trab|ingresos|Nivel_escolar_A|relacion_A|raza_A|sexo_A|pais_A|ingresos_A|rango_etario_A|clases_A|
+----+------------+-------+--------------+-----------------+-------------+---------------+---------------+----------+--------+---------------+----------+------+------+------+----------+--------------+--------+
|26.0|     Private| 256000|          15.0|   Prof-specialty|      Husband|        99999.0|22861.240234375|      60.0|    >50K|            6.0|       0.0|   0.0|   0.0|   0.0|       1.0|           1.0|     0.0|
|36.0|     Private| 183892|          10.0|     Craft-repair|      Husband|         7298.0|22861.240234375|      44.0|    >50K|            0.0|       0.0|   0.0|

In [13]:
#11. Cambiar el nombre de la columna clases por label. [5 ptos]

df=df.withColumnRenamed("clases_A","label")

df.show()


+----+------------+-------+--------------+-----------------+-------------+---------------+---------------+----------+--------+---------------+----------+------+------+------+----------+--------------+-----+
|Edad|TipoEduccion|Ahorros|anos_educacion|        profesion|     relacion|       ganancia|       perdidas|horas_trab|ingresos|Nivel_escolar_A|relacion_A|raza_A|sexo_A|pais_A|ingresos_A|rango_etario_A|label|
+----+------------+-------+--------------+-----------------+-------------+---------------+---------------+----------+--------+---------------+----------+------+------+------+----------+--------------+-----+
|26.0|     Private| 256000|          15.0|   Prof-specialty|      Husband|        99999.0|22861.240234375|      60.0|    >50K|            6.0|       0.0|   0.0|   0.0|   0.0|       1.0|           1.0|  0.0|
|36.0|     Private| 183892|          10.0|     Craft-repair|      Husband|         7298.0|22861.240234375|      44.0|    >50K|            0.0|       0.0|   0.0|   0.0|   0.

In [14]:
#12. Crear vector assembler para crear modelos de aprendizaje.
from pyspark.ml.feature import VectorAssembler
caracteristicas=['Nivel_escolar_A',
                 'relacion_A',
                 'raza_A',
                 'sexo_A',
                 'pais_A',
                 'ingresos_A',
                 'label'

                 ]
assembler= VectorAssembler(inputCols=caracteristicas,outputCol='features')
df=assembler.transform(df)

df.show()

+----+------------+-------+--------------+-----------------+-------------+---------------+---------------+----------+--------+---------------+----------+------+------+------+----------+--------------+-----+--------------------+
|Edad|TipoEduccion|Ahorros|anos_educacion|        profesion|     relacion|       ganancia|       perdidas|horas_trab|ingresos|Nivel_escolar_A|relacion_A|raza_A|sexo_A|pais_A|ingresos_A|rango_etario_A|label|            features|
+----+------------+-------+--------------+-----------------+-------------+---------------+---------------+----------+--------+---------------+----------+------+------+------+----------+--------------+-----+--------------------+
|26.0|     Private| 256000|          15.0|   Prof-specialty|      Husband|        99999.0|22861.240234375|      60.0|    >50K|            6.0|       0.0|   0.0|   0.0|   0.0|       1.0|           1.0|  0.0| (7,[0,5],[6.0,1.0])|
|36.0|     Private| 183892|          10.0|     Craft-repair|      Husband|         7298.

In [15]:
#13. Realizar proceso de normalización a los datos [10 ptos]
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
normalizer = Normalizer(inputCol="features", outputCol="Features_N", p=1.0)
df = normalizer.transform(df)
df.show()

+----+------------+-------+--------------+-----------------+-------------+---------------+---------------+----------+--------+---------------+----------+------+------+------+----------+--------------+-----+--------------------+--------------------+
|Edad|TipoEduccion|Ahorros|anos_educacion|        profesion|     relacion|       ganancia|       perdidas|horas_trab|ingresos|Nivel_escolar_A|relacion_A|raza_A|sexo_A|pais_A|ingresos_A|rango_etario_A|label|            features|          Features_N|
+----+------------+-------+--------------+-----------------+-------------+---------------+---------------+----------+--------+---------------+----------+------+------+------+----------+--------------+-----+--------------------+--------------------+
|26.0|     Private| 256000|          15.0|   Prof-specialty|      Husband|        99999.0|22861.240234375|      60.0|    >50K|            6.0|       0.0|   0.0|   0.0|   0.0|       1.0|           1.0|  0.0| (7,[0,5],[6.0,1.0])|(7,[0,5],[0.85714...|
|36.

In [16]:
#14. Dividir el dataset en dos en una proporción del 80 20, 1 primer dataset entrena el modelo y se debe llamar train, el segundo valida el modelo y se debe llamar test. [5 ptos]
(train,test) = df.randomSplit([0.8,0.2])

In [17]:
#III.- Fase Modelado
#Se pide trabajar crear 6 modelos de aprendizaje los cuales serán: RandomForest, Árbol de decisión, NaiveBayes, regresión logística, BTCClassifier y MultilayerPerceptron.
#RandomForest
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='label', 
                            featuresCol='Features_N',
                            maxDepth=5)

modelo = rf.fit(train)
predictions = modelo.transform(test)
#IV.- Evaluación [40 ptos]
#Deberá entregar las métricas de Accuracy y F1 para todos los modelos de aprendizajes [20 ptos]
#Seleccionar el modelo que se utilizará y dar las fundamentaciones de la razón. [20 ptos]

#RandomForest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator2 = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
acura =evaluator2.evaluate(predictions)
print('RandomForest Test acuracy = ', acura)


evaluator1 = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')
f1 = evaluator1.evaluate(predictions)
print('RandomForestRandomForest Test f1 = ', f1)


RandomForest Test acuracy =  0.6
RandomForestRandomForest Test f1 =  0.4499999999999999


In [18]:
#Arbol decision
from pyspark.ml.classification import  DecisionTreeClassifier
Tree = DecisionTreeClassifier (labelCol='label', 
                            featuresCol='Features_N')

modelo = Tree.fit(train)
predictions = modelo.transform(test)

#Árbol de decisión
evaluator2 = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
acura =evaluator2.evaluate(predictions)
print('Árbol de decisión acuracy = ', acura)

evaluator1 = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')
f1 = evaluator1.evaluate(predictions)
print('Árbol de decisión f1 = ', f1)



Árbol de decisión acuracy =  0.6
Árbol de decisión f1 =  0.4499999999999999


In [19]:
#Naivabes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='Features_N', labelCol='label')
modelo = nb.fit(train)
predictions = modelo.transform(test)

#NaiveBayes
evaluator2 = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
acura =evaluator2.evaluate(predictions)
print('NaiveBayes Test acuracy = ', acura)

evaluator1 = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')
f1 = evaluator1.evaluate(predictions)
print('NaiveBayes Test f1 = ', f1)


NaiveBayes Test acuracy =  0.6
NaiveBayes Test f1 =  0.4499999999999999


In [20]:
#regresión logística
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='Features_N', labelCol='label')
modelo = lr.fit(train)
predictions = modelo.transform(test)

#Regresion Logica
evaluator2 = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
acura =evaluator2.evaluate(predictions)
print('Regresion Logica Test acuracy = ', acura)

evaluator1 = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')
f1 = evaluator1.evaluate(predictions)
print('Regresion Logica Test f1 = ', f1)


Regresion Logica Test acuracy =  0.6
Regresion Logica Test f1 =  0.4499999999999999


In [21]:
#BTCClassifier
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(featuresCol='Features_N', labelCol='label')
modelo = lr.fit(train)
predictions = modelo.transform(test)


#BTCClassifier
evaluator2 = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
acura =evaluator2.evaluate(predictions)
print('BTCClassifier Test acuracy = ', acura)

evaluator1 = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')
f1 = evaluator1.evaluate(predictions)
print('BTCClassifier Test f1 = ', f1)

BTCClassifier Test acuracy =  0.6
BTCClassifier Test f1 =  0.4499999999999999


In [22]:
#MultilayerPerceptron
from pyspark.ml.classification import MultilayerPerceptronClassifier
mpc = MultilayerPerceptronClassifier(featuresCol='Features_N', labelCol='label')
mpc_model = gbt.fit(train)


#MultilayerPerceptron
evaluator2 = MulticlassClassificationEvaluator(labelCol='label', metricName='accuracy')
acura =evaluator2.evaluate(predictions)
print('MultilayerPerceptron Test acuracy = ', acura)

evaluator1 = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')
f1 = evaluator1.evaluate(predictions)
print('MultilayerPerceptron Test f1 = ', f1)

MultilayerPerceptron Test acuracy =  0.6
MultilayerPerceptron Test f1 =  0.4499999999999999


In [23]:
#IV.- Evaluación [40 ptos]
#Seleccionar el modelo que se utilizará y dar las fundamentaciones de la razón. [20 ptos]

print("la metrica a elegir es randomforest(aunque tambien sirve arbol de decision y NaiveBayes), ya que las demas no retorna lo necesitado ")

la metrica a elegir es randomforest(aunque tambien sirve arbol de decision y NaiveBayes), ya que las demas no retorna lo necesitado 
