# Imports

In [72]:
from pyspark.sql import SparkSession
import pandas as ps
from pyspark.sql import functions as F
 # mean, col, split, col, regexp_extract, when, lit

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC

from matplotlib import pyplot as plt
import warnings
warnings.filterwarnings('ignore')

from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

# Lecture
### affichage

In [27]:
#CSV entier non nettoyé
income_adult = "./adult.csv"
df = ps.read_csv(income_adult)
df.head()

sdf = spark.read.csv(income_adult, header=True, inferSchema=True).cache()
#affichage du dataset
sdf.show()

+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|   education|educational-num|    marital-status|       occupation| relationship|              race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
| 25|         Private|226802|        11th|              7|     Never-married|Machine-op-inspct|    Own-child|             Black|  Male|           0|           0|            40| United-States| <=50K|
| 38|         Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|      Husband|             White|  Male|           0|           0|            50| United-States| <=50K|
| 28|

22/10/06 09:45:40 WARN CacheManager: Asked to cache already cached data.


### nombre (lignes & colonnes)

In [28]:

sdf.is_cached
print(sdf.count())
len(df.columns)

48842


15

### description

In [29]:
sdf.describe().toPandas()  # # par défaut ttes les col 
sdf.dtypes
sdf.select([col[0] for col in sdf.dtypes if col[1] != 'string']).describe().show()

                                                                                

+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|               age|            fnlwgt|   educational-num|      capital-gain|      capital-loss|    hours-per-week|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|             48842|             48842|             48842|             48842|             48842|             48842|
|   mean| 38.64358543876172|189664.13459727284|10.078088530363212|1079.0676262233324| 87.50231358257237|40.422382375824085|
| stddev|13.710509934443518|105604.02542315758| 2.570972755592256| 7452.019057655418|403.00455212435924|12.391444024252296|
|    min|                17|             12285|                 1|                 0|                 0|                 1|
|    max|                90|           1490400|                16|             99999|              4356|                99|
+-------

# code
### indexage

In [35]:
df.describe()  # par défaut seulement les col numériques 
df.describe(include=['O']) # ça ne marche pas (comme ds le vrai Pandas) 
df_dtypes = df.dtypes
cols_cat = df_dtypes[df_dtypes == 'object'].index 
# cols_cat = ['job', 'marital','education','default','housing', 'loan', 'contact','month','poutcome','Target']
df[cols_cat].describe()

sdf.columns
labelCol = 'income'

feature_numeric = ['age', 'fnlwgt', 'educational-num','capital-gain','capital-loss','hours-per-week']

feature_cat = ['workclass', 'education', 'marital-status','occupation', 'relationship','race', 'gender', 'native-country']
feature_cat_indexed = [col+'_indexed' for col in feature_cat]

feature_cat_encoded = [col +'_encoded' for col in feature_cat_indexed]
feature_cat_encoded

print(feature_cat_indexed)
    
print(feature_cat_encoded)

['workclass_indexed', 'education_indexed', 'marital-status_indexed', 'occupation_indexed', 'relationship_indexed', 'race_indexed', 'gender_indexed', 'native-country_indexed']
['workclass_indexed_encoded', 'education_indexed_encoded', 'marital-status_indexed_encoded', 'occupation_indexed_encoded', 'relationship_indexed_encoded', 'race_indexed_encoded', 'gender_indexed_encoded', 'native-country_indexed_encoded']


### string indexer

In [36]:
indexer_feature = StringIndexer(inputCols=feature_cat, handleInvalid='skip', outputCols=feature_cat_indexed)
indexer_label = StringIndexer(inputCol=labelCol, handleInvalid='skip', outputCol=labelCol+'_indexed')

sdf = indexer_feature.fit(sdf).transform(sdf)
sdf.show(n=1, truncate=False, vertical=True)

-RECORD 0-----------------------------------
 age                    | 25                
 workclass              | Private           
 fnlwgt                 | 226802            
 education              | 11th              
 educational-num        | 7                 
 marital-status         | Never-married     
 occupation             | Machine-op-inspct 
 relationship           | Own-child         
 race                   | Black             
 gender                 | Male              
 capital-gain           | 0                 
 capital-loss           | 0                 
 hours-per-week         | 40                
 native-country         | United-States     
 income                 | <=50K             
 workclass_indexed      | 0.0               
 education_indexed      | 5.0               
 marital-status_indexed | 1.0               
 occupation_indexed     | 6.0               
 relationship_indexed   | 2.0               
 race_indexed           | 1.0               
 gender_in

### encoders

In [37]:
encoders = OneHotEncoder(dropLast=False, inputCols=feature_cat_indexed, outputCols=feature_cat_encoded)  
sdf = encoders.fit(sdf).transform(sdf)
sdf.select(feature_cat_indexed+feature_cat_encoded).show(n=2, truncate=False, vertical=True)

-RECORD 0-----------------------------------------
 workclass_indexed              | 0.0             
 education_indexed              | 5.0             
 marital-status_indexed         | 1.0             
 occupation_indexed             | 6.0             
 relationship_indexed           | 2.0             
 race_indexed                   | 1.0             
 gender_indexed                 | 0.0             
 native-country_indexed         | 0.0             
 workclass_indexed_encoded      | (9,[0],[1.0])   
 education_indexed_encoded      | (16,[5],[1.0])  
 marital-status_indexed_encoded | (7,[1],[1.0])   
 occupation_indexed_encoded     | (15,[6],[1.0])  
 relationship_indexed_encoded   | (6,[2],[1.0])   
 race_indexed_encoded           | (5,[1],[1.0])   
 gender_indexed_encoded         | (2,[0],[1.0])   
 native-country_indexed_encoded | (42,[0],[1.0])  
-RECORD 1-----------------------------------------
 workclass_indexed              | 0.0             
 education_indexed             

### vector assembler

In [38]:
sdf = spark.read.csv(income_adult, header=True, inferSchema=True).cache()
assembler = VectorAssembler(inputCols=feature_cat_encoded+feature_numeric, outputCol='features')


22/10/06 09:57:27 WARN CacheManager: Asked to cache already cached data.


### pipeline

In [39]:
Pipeline(stages= [indexer_feature]+[indexer_label]+[encoders]+[assembler]).fit(sdf).transform(sdf).show(n=1, truncate=False, vertical=True)

22/10/06 09:57:28 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-RECORD 0---------------------------------------------------------------------------------------------------------------------------------
 age                            | 25                                                                                                      
 workclass                      | Private                                                                                                 
 fnlwgt                         | 226802                                                                                                  
 education                      | 11th                                                                                                    
 educational-num                | 7                                                                                                       
 marital-status                 | Never-married                                                                                           
 occupation                

### séparation des données test / train

In [42]:
train, test = sdf.randomSplit([0.7, 0.3],seed = 11)
train.show(n=1, truncate=False, vertical=True)
test.show(n=1, truncate=False, vertical=True)

-RECORD 0------------------------
 age             | 17            
 workclass       | ?             
 fnlwgt          | 27251         
 education       | 11th          
 educational-num | 7             
 marital-status  | Widowed       
 occupation      | ?             
 relationship    | Own-child     
 race            | White         
 gender          | Male          
 capital-gain    | 0             
 capital-loss    | 0             
 hours-per-week  | 40            
 native-country  | United-States 
 income          | <=50K         
only showing top 1 row

-RECORD 0------------------------
 age             | 17            
 workclass       | ?             
 fnlwgt          | 44789         
 education       | Some-college  
 educational-num | 10            
 marital-status  | Never-married 
 occupation      | ?             
 relationship    | Own-child     
 race            | White         
 gender          | Male          
 capital-gain    | 0             
 capital-loss    | 0    

# LOGISTIC REGRESSION ALGORITHM

In [43]:
lr = LogisticRegression(labelCol='income_indexed', featuresCol='features')

train, test = spark.read.csv(income_adult, header=True, inferSchema=True) \
     .cache() \
     .randomSplit([0.7, 0.3], seed = 5)

train.cache(), test.cache()

22/10/06 12:10:24 WARN CacheManager: Asked to cache already cached data.
22/10/06 12:10:24 WARN CacheManager: Asked to cache already cached data.
22/10/06 12:10:24 WARN CacheManager: Asked to cache already cached data.


(DataFrame[age: int, workclass: string, fnlwgt: int, education: string, educational-num: int, marital-status: string, occupation: string, relationship: string, race: string, gender: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, income: string],
 DataFrame[age: int, workclass: string, fnlwgt: int, education: string, educational-num: int, marital-status: string, occupation: string, relationship: string, race: string, gender: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, income: string])

In [45]:
model = Pipeline(stages= [indexer_feature]+[indexer_label]+[encoders]+[assembler]+[lr]).fit(train)
pred_lr = model.transform(test)
pred_lr.select('prediction', 'income_indexed', 'features').show()

+----------+--------------+--------------------+
|prediction|income_indexed|            features|
+----------+--------------+--------------------+
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,9,26,39,4...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,10,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|        

### sélection des colonnes de prédiction

In [52]:
preds_and_labels = pred_lr.select(['prediction','income_indexed']).withColumn('label', F.col('income_indexed').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','label'])

                                                                                

### Affichage de la matrice de confusion + précision

In [55]:
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

[[10368.   742.]
 [ 1434.  2125.]]


In [56]:
metrics.accuracy

0.8516599631876747

# DECISION TREE ALGORITHM

In [59]:
evaluator = MulticlassClassificationEvaluator(labelCol='income_indexed', predictionCol='prediction', metricName='accuracy')
tree = DecisionTreeClassifier(labelCol='income_indexed', featuresCol='features')

model = Pipeline(stages= [indexer_feature]+[indexer_label]+[encoders]+[assembler]+[tree]).fit(train)
pred_tree = model.transform(test)
pred_tree.select('prediction', 'income_indexed', 'features').show()

+----------+--------------+--------------------+
|prediction|income_indexed|            features|
+----------+--------------+--------------------+
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,9,26,39,4...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,10,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|        

### Matrice de confuion + précision

In [60]:
preds_and_labels = pred_tree.select(['prediction','income_indexed']).withColumn('label', F.col('income_indexed').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','label'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

[[10457.   653.]
 [ 1676.  1883.]]


In [61]:
accuracy_tree = evaluator.evaluate(pred_tree)
print("Précision = %g"% (accuracy_tree))
print("Taux d'erreur = %g " % (1.0 - accuracy_tree))

Précision = 0.84123
Taux d'erreur = 0.15877 


# NATIVE BAYS ALGORITHM

#### Toutes les features sont indépendantes
#### Naive Bayes repose sur un calcul de proba
#### Il va calculer la proba qu'une prédiction soit vraie la proba qu'elle soit fausse
#### En comparant les 2 résultats on peut déduire à quelle classe appartient la donnée


In [63]:
nb = NaiveBayes(labelCol="income_indexed", featuresCol="features")
nb_model = Pipeline(stages= [indexer_feature]+[indexer_label]+[encoders]+[assembler]+[nb]).fit(train)
nb_prediction = nb_model.transform(test)
nb_prediction.select("prediction", "income_indexed", "features").show()



+----------+--------------+--------------------+
|prediction|income_indexed|            features|
+----------+--------------+--------------------+
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,9,26,39,4...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,10,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|        

### Matrice de confusion + précision

In [65]:
preds_and_labels = nb_prediction.select(['prediction','income_indexed']).withColumn('label', F.col('income_indexed').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','label'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

[[10604.   506.]
 [ 2732.   827.]]


In [66]:

nbaccuracy = evaluator.evaluate(nb_prediction) 
print("Précision = " + str(nbaccuracy))

Précision = 0.7792623900743063


# SVM ALGORITHM

In [73]:
svm = LinearSVC(labelCol="income_indexed", featuresCol="features")
svm_model = Pipeline(stages= [indexer_feature]+[indexer_label]+[encoders]+[assembler]+[svm]).fit(train)
svm_prediction = svm_model.transform(test)
svm_prediction.select("prediction", "income_indexed", "features").show()

+----------+--------------+--------------------+
|prediction|income_indexed|            features|
+----------+--------------+--------------------+
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,9,26,39,4...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,10,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|        

### Matrice de confusion + précision

In [74]:
preds_and_labels = svm_prediction.select(['prediction','income_indexed']).withColumn('label', F.col('income_indexed').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','label'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

[[10424.   686.]
 [ 1510.  2049.]]


In [80]:
svm_accuracy = evaluator.evaluate(svm_prediction)
print("Test accuracy = " + str(svm_accuracy)) 

Test accuracy = 0.8502965437316791


# Gradient Boosted Tree ALGORITHM

In [78]:
gbt = GBTClassifier(labelCol="income_indexed", featuresCol="features",maxIter=10)
gbt_model = Pipeline(stages= [indexer_feature]+[indexer_label]+[encoders]+[assembler]+[gbt]).fit(train)
gbt_prediction = gbt_model.transform(test)
gbt_prediction.select("prediction", "income_indexed", "features").show()

+----------+--------------+--------------------+
|prediction|income_indexed|            features|
+----------+--------------+--------------------+
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,9,26,39,4...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,10,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,16,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|           0.0|(108,[3,20,26,39,...|
|       0.0|           0.0|(108,[3,14,26,39,...|
|       0.0|        

### Matrice de confusion + précision

In [77]:
preds_and_labels = gbt_prediction.select(['prediction','income_indexed']).withColumn('label', F.col('income_indexed').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','label'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

22/10/06 12:56:04 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/10/06 12:56:04 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


[[10512.   598.]
 [ 1585.  1974.]]


In [81]:
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Test accuracy = " + str(gbt_accuracy))

Test accuracy = 0.8511827663780762
