In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [3]:
spark = SparkSession.builder.appName("Predict Adult Salary").getOrCreate()

In [4]:
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", IntegerType(), True),
    StructField("education", StringType(), True),
    StructField("education-num", IntegerType(), True),
    StructField("marital-status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital-gain", IntegerType(), True),
    StructField("capital-loss", IntegerType(), True),
    StructField("hours-per-week", IntegerType(), True),
    StructField("native-country", StringType(), True),
    StructField("salary", StringType(), True)
])

In [5]:
train_df = spark.read.csv('train.csv', header=False, schema=schema)
test_df = spark.read.csv('test.csv', header=False, schema=schema)

In [6]:
test_df.head(5)

[Row(age=25, workclass='Private', fnlwgt=226802, education='11th', education-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', sex='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', salary='>50K'),
 Row(age=38, workclass='Private', fnlwgt=89814, education='HS-grad', education-num=9, marital-status='Married-civ-spouse', occupation='Farming-fishing', relationship='Husband', race='White', sex='Male', capital-gain=0, capital-loss=0, hours-per-week=50, native-country='United-States', salary='>50K'),
 Row(age=28, workclass='Local-gov', fnlwgt=336951, education='Assoc-acdm', education-num=12, marital-status='Married-civ-spouse', occupation='Protective-serv', relationship='Husband', race='White', sex='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', salary='>50K'),
 Row(age=44, workclass='Private', fnlwgt=160323, education='Some-college', education-num=1

In [7]:
train_df.limit(5).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


In [8]:
#convertimos variables categóricas en númericas

categorical_variables = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']
indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]


In [9]:
encoder = OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

In [10]:
assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="categorical-features"
)


In [11]:
pipeline = Pipeline(stages=indexers + [encoder, assembler])
train_df = pipeline.fit(train_df).transform(train_df)
test_df = pipeline.fit(test_df).transform(test_df)

In [12]:
test_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- sex-index: double (nullable = false)
 |-- native-country-index: double 

In [13]:
df = train_df.limit(5).toPandas()
df['categorical-features'][1]

SparseVector(93, {1: 1.0, 10: 1.0, 23: 1.0, 31: 1.0, 43: 1.0, 48: 1.0, 52: 1.0, 53: 1.0})

In [14]:
#Unimos las variables numéricas en una sola columna
continuous_variables = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']
assembler = VectorAssembler(
    inputCols=['categorical-features', *continuous_variables],
    outputCol='features'
)
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

In [15]:
train_df.limit(5).toPandas()['features'][1]

SparseVector(99, {1: 1.0, 10: 1.0, 23: 1.0, 31: 1.0, 43: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 50.0, 94: 83311.0, 95: 13.0, 98: 13.0})

In [17]:
#Finalmente, creamos la columna Label como salary
indexer = StringIndexer(inputCol='salary', outputCol='label')
train_df = indexer.fit(train_df).transform(train_df)
test_df = indexer.fit(test_df).transform(test_df)
train_df.limit(10).toPandas()['label']

0    0.0
1    0.0
2    0.0
3    0.0
4    0.0
5    0.0
6    0.0
7    1.0
8    1.0
9    1.0
Name: label, dtype: float64

In [18]:
test_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- sex-index: double (nullable = false)
 |-- native-country-index: double 

In [19]:
#Ajustamos y entrenamos nuestro modelo
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train_df)

In [20]:
test_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- sex-index: double (nullable = false)
 |-- native-country-index: double 

In [21]:
#El método transform es usado para hacer predicciones al conjunto de prueba
pred = model.transform(test_df)

In [22]:
pred.limit(10).toPandas()[['label', 'prediction']]
# o
# pred.select("prediction", "label").show(5)

Unnamed: 0,label,prediction
0,0.0,0.0
1,0.0,0.0
2,0.0,0.0
3,0.0,1.0
4,0.0,0.0
5,0.0,0.0
6,0.0,0.0
7,0.0,1.0
8,0.0,0.0
9,0.0,0.0


In [23]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Obtenemos la precisión del modelo
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.189853 
