# Instalando pacotes adicionais

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.conf import SparkConf
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer, UnivariateFeatureSelector
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Iniciando sessão no Spark

In [2]:
file_path = 'file:////home/jovyan/work/'
spark_master_url = 'spark://c56150f4d1d4:7077'
conf = SparkConf().setAll([
    ('spark.master', spark_master_url),
    ('spark.app.name', 'Análise De Dados Spark'),
])

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark

# Carregando dados no Spark

In [3]:
df = spark.read.csv(file_path + 'Datasets/adult.csv', 
                    sep=',', 
                    header=True, 
                    inferSchema=True, 
                    encoding='ISO-8859-1',
                    nullValue='?',
                    ignoreLeadingWhiteSpace=True,
                    ignoreTrailingWhiteSpace=True)
df.createOrReplaceTempView('adult')
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



# Prévia da base de dados

In [4]:
print(df.count())
df.show(15)

32561
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|education-num|      marital-status|       occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|   Bachelors|           13|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|           0|           0|            13| United-States| <=50K|

# Detectando quais colunas possuem variáveis nulas.

In [5]:
columns_with_null = []
for c in df.columns:
    if df.filter(f'`{c}` IS NULL').count() > 0:
        columns_with_null.append(c)
columns_with_null

['workclass', 'occupation', 'native-country']

# Substituindo os missing values 
- média: para variáveis numéricas
- moda: variáveis categóricas

In [6]:
for n in columns_with_null:
    if dict(df.dtypes)[n] == 'string':
        moda = spark.sql(f'select `{n}`, count(1) as moda from adult group by `{n}` order by moda desc').collect()[0][0]
        df = df.withColumn(n, when(col(n).isNull(), moda).otherwise(col(n)))
    elif dict(df.dtypes)[n] == 'double':
        media = spark.sql(f'select avg(`{n}`) as media from adult').collect()[0][0]
        df = df.withColumn(n, when(col(n).isNull(), media).otherwise(col(n)))

columns_with_null = []
for c in df.columns:
    if df.filter(f'`{c}` IS NULL').count() > 0:
        columns_with_null.append(c)
columns_with_null

[]

# Transformando a variável native-country em binária 
Optei por separar a população em nativos e estrangeiros (United States, Foreign Country)

In [7]:
n = 'native-country'
df = df.withColumn(n, when(col(n).contains('United-States'), col(n)).otherwise('Foreign-Country'))
df.createOrReplaceTempView('adult')
spark.sql(f'select `{n}`, count(1) from adult group by `{n}`').show()

+---------------+--------+
| native-country|count(1)|
+---------------+--------+
|  United-States|   29753|
|Foreign-Country|    2808|
+---------------+--------+



# Transformando variáveis categórias não ordinais em dummies

In [8]:
indexer = StringIndexer()
indexer.setInputCols(['workclass', 'marital-status', 'occupation', 'relationship', 'race'])
indexer.setOutputCols(['workclass-num', 'marital-status-num', 'occupation-num', 'relationship-num', 'race-num'])
df2 = indexer.fit(df).transform(df)
df2 = df2.drop('education', 'workclass', 'marital-status', 'occupation', 'relationship', 'race')

one = OneHotEncoder()
one.setInputCols(['workclass-num', 'marital-status-num', 'occupation-num', 'relationship-num', 'race-num'])
one.setOutputCols(['workclass-vec', 'marital-status-vec', 'occupation-vec', 'relationship-vec', 'race-vec'])
df2 = one.fit(df2).transform(df2)

df2 = df2.drop('workclass-num', 'marital-status-num', 'occupation-num', 'relationship-num', 'race-num')

# Transformando variáveis categóricas binárias em 0s e 1s

In [9]:
df2 = df2.withColumn('sex-num', when(col('sex').contains('Male'), 0).otherwise(1))
df2 = df2.withColumn('native-country-num', when(col('native-country').contains('United-States'), 0).otherwise(1))
df2 = df2.withColumn('income-num', when(col('income').contains('>50K'), 0).otherwise(1))
df2 = df2.drop('sex', 'native-country', 'income')
df2.show(5)

+---+------+-------------+------------+------------+--------------+-------------+------------------+--------------+----------------+-------------+-------+------------------+----------+
|age|fnlwgt|education-num|capital-gain|capital-loss|hours-per-week|workclass-vec|marital-status-vec|occupation-vec|relationship-vec|     race-vec|sex-num|native-country-num|income-num|
+---+------+-------------+------------+------------+--------------+-------------+------------------+--------------+----------------+-------------+-------+------------------+----------+
| 39| 77516|           13|        2174|           0|            40|(7,[3],[1.0])|     (6,[1],[1.0])|(13,[3],[1.0])|   (5,[1],[1.0])|(4,[0],[1.0])|      0|                 0|         1|
| 50| 83311|           13|           0|           0|            13|(7,[1],[1.0])|     (6,[0],[1.0])|(13,[2],[1.0])|   (5,[0],[1.0])|(4,[0],[1.0])|      0|                 0|         1|
| 38|215646|            9|           0|           0|            40|(7,[0],[

# Fazendo a seleção de atributos antes de preprocessar os dados.

In [10]:
vec_assembler = VectorAssembler(inputCols=df2.columns[:-1], outputCol='features',handleInvalid='skip')
df3 = vec_assembler.transform(df2).select('features', 'income-num')
selector = UnivariateFeatureSelector(featuresCol='features', outputCol='selected_features', 
                                     labelCol='income-num', selectionMode='fpr')
selector.setFeatureType("continuous").setLabelType("continuous")
result = selector.fit(df3).transform(df3)
result.show(10)
result = result.drop('features')

+--------------------+----------+--------------------+
|            features|income-num|   selected_features|
+--------------------+----------+--------------------+
|(43,[0,1,2,3,5,9,...|         1|(41,[0,1,2,4,8,13...|
|(43,[0,1,2,5,7,13...|         1|(41,[0,1,4,6,12,2...|
|(43,[0,1,2,5,6,15...|         1|(41,[0,1,4,5,14,2...|
|(43,[0,1,2,5,6,13...|         1|(41,[0,1,4,5,12,2...|
|(43,[0,1,2,5,6,13...|         1|(41,[0,1,4,5,12,1...|
|(43,[0,1,2,5,6,13...|         1|(41,[0,1,4,5,12,2...|
|(43,[0,1,2,5,6,18...|         1|(41,[0,1,4,5,17,2...|
|(43,[0,1,2,5,7,13...|         0|(41,[0,1,4,6,12,2...|
|(43,[0,1,2,3,5,6,...|         0|(41,[0,1,2,4,5,13...|
|(43,[0,1,2,3,5,6,...|         0|(41,[0,1,2,4,5,12...|
+--------------------+----------+--------------------+
only showing top 10 rows



# Aplicando o SVM sobre as Features selecionadas

In [11]:
svm = LinearSVC(featuresCol='selected_features', labelCol='income-num', maxIter=100,regParam=0.1)
train, test = result.randomSplit([0.8, 0.2], 42)

svm_model = svm.fit(train)
predictions = svm_model.transform(test)

predictions.show(10)

+----------+--------------------+--------------------+----------+
|income-num|   selected_features|       rawPrediction|prediction|
+----------+--------------------+--------------------+----------+
|         0|(41,[0,1,2,4,5,12...|[-0.0912079966354...|       1.0|
|         0|(41,[0,1,2,4,5,12...|[0.61099781070448...|       0.0|
|         0|(41,[0,1,2,4,5,12...|[4.99189728201738...|       0.0|
|         0|(41,[0,1,2,4,5,12...|[0.20455142387920...|       0.0|
|         0|(41,[0,1,2,4,5,12...|[5.08403115384511...|       0.0|
|         0|(41,[0,1,2,4,5,12...|[0.17798751749186...|       0.0|
|         0|(41,[0,1,2,4,5,12...|[0.04780746964519...|       0.0|
|         0|(41,[0,1,2,4,5,12...|[0.85951165310034...|       0.0|
|         0|(41,[0,1,2,4,5,12...|[5.69050817308497...|       0.0|
|         0|(41,[0,1,2,4,5,12...|[5.82238797831450...|       0.0|
+----------+--------------------+--------------------+----------+
only showing top 10 rows



# Calculando acurácia do modelo

In [12]:
evaluator=MulticlassClassificationEvaluator(labelCol='income-num', predictionCol='prediction',
                                                        metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(str(accuracy))

0.8420971472629144


# Aplicando Tuning sobre o modelo

In [13]:
svm = LinearSVC(featuresCol='selected_features', labelCol='income-num')
paramGrid = ParamGridBuilder().addGrid(svm.maxIter, [100, 500]) \
                              .addGrid(svm.regParam, [0.01, 0.1]) \
                              .build()
acc_eval = MulticlassClassificationEvaluator(labelCol='income-num', predictionCol='prediction',
                                                        metricName='accuracy')
crossval = CrossValidator(estimator=svm,
                          estimatorParamMaps=paramGrid,
                          evaluator=acc_eval,
                          numFolds=10,
                          parallelism=2)


cvModel = crossval.fit(train)
predictions = cvModel.bestModel.transform(test)
accuracy = acc_eval.evaluate(predictions)
print(str(accuracy))

0.8493446414803393
