# Préparation 
En local, télécharger le fichier spark-apach https://spark.apache.org/downloads.html.<br>
Rajouter export PYSPARK_DRIVER_PYTHON=jupyter & export PYSPARK_DRIVER_PYTHON_OPTS='notebook' <br>
Dans spark/bin lancer ./pyspark depuis le terminal

In [1]:
# Principaux import
import pyspark
import findspark
from pyspark.sql import SparkSession 
from pyspark import SparkConf 
from pyspark.ml.classification import *
from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# pour les dataframe et udf
from pyspark.sql import *  
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.evaluation import *

from datetime import *

# pour le chronomètre
import time

# initialise les variables d'environnement pour spark
findspark.init()

# Démarrage session spark 
# --------------------------
def demarrer_spark():
    local = "local[*]"
    appName = "Project BDLE"
    configLocale = SparkConf().setAppName(appName).setMaster(local).\
    set("spark.executor.memory", "6G").\
    set("spark.driver.memory","6G").\
    set("spark.sql.catalogImplementation","in-memory")
  
    spark = SparkSession.builder.config(conf = configLocale).getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")
  
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold","-1")

  # On ajuste l'environnement d'exécution des requêtes à la taille du cluster (4 coeurs)
    spark.conf.set("spark.sql.shuffle.partitions","4")    
    print("session démarrée, son id est ", sc.applicationId)
    return spark
spark = demarrer_spark()

session démarrée, son id est  local-1636194557636


In [2]:
# on utilise 8 partitions au lieu de 200 par défaut
spark.conf.set("spark.sql.shuffle.partitions", "8")
print("Nombre de partitions utilisées : ", spark.conf.get("spark.sql.shuffle.partitions"))

Nombre de partitions utilisées :  8


# Lecture et Pré-traitement
### Adult Census Income 
Ces données ont été extraites de la base de données du bureau du recensement de 1994 par Ronny Kohavi et Barry Becker (Data Mining and Visualization, Silicon Graphics). La tâche de prédiction consiste à déterminer si une personne gagne plus de 50 000 $ par an.

À noter que ces données en question sont déséquilibré, ils existent plus de personnes gagnant moins de 50K\\$ l'année que plus de 50K\\$

In [3]:
Schema = StructType([
  StructField("age", IntegerType()),
  StructField("workClass", StringType()),
  StructField("fnlwgt", IntegerType()),
  StructField("education", StringType()),
  StructField("education-num", IntegerType()),
  StructField("marital-status", StringType()),
  StructField("occupation", StringType()),
  StructField("relationship", StringType()),
  StructField("race", StringType()),
  StructField("sex", StringType()),
  StructField("capital-gain", IntegerType()),
  StructField("capital-loss", IntegerType()),
  StructField("hours-per-week", IntegerType()),
  StructField("native-country", StringType()),
  StructField("income", StringType())
])



df1=pd.read_csv('/Users/addadyouva/Downloads/BDLE/Adult_data/adult.data',header=None,na_values=' ?')
df2=pd.read_csv('/Users/addadyouva/Downloads/BDLE/Adult_data/adult.test',header=None,na_values=' ?')
df_concat=pd.DataFrame(pd.concat([df1,df2],ignore_index=True))
#df_concat.replace(' ?', np.NaN,inplace=True)
#df_concat.replace(' ?', None,inplace=True)
df_concat[14]=df_concat[14].apply(lambda x: x.replace('.',''))

# taux de valeurs manquantes

In [None]:
1-len(df_concat.dropna())/len(df_concat)

Comme le taux de valeur manquante est négligeable, nous pouvons directement supprimer ces valeurs

In [4]:
df_concat=df_concat.dropna()
AdultCensusIncome=spark.createDataFrame(df_concat,schema=Schema)
AdultCensusIncome.printSchema()
AdultCensusIncome.toPandas()

root
 |-- age: integer (nullable = true)
 |-- workClass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



Unnamed: 0,age,workClass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,income
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
45217,33,Private,245211,Bachelors,13,Never-married,Prof-specialty,Own-child,White,Male,0,0,40,United-States,<=50K
45218,39,Private,215419,Bachelors,13,Divorced,Prof-specialty,Not-in-family,White,Female,0,0,36,United-States,<=50K
45219,38,Private,374983,Bachelors,13,Married-civ-spouse,Prof-specialty,Husband,White,Male,0,0,50,United-States,<=50K
45220,44,Private,83891,Bachelors,13,Divorced,Adm-clerical,Own-child,Asian-Pac-Islander,Male,5455,0,40,United-States,<=50K


On peut donc remarquer ici le déséquilibre des classes, ou il y a plus de personne gagnant moins de 50K$ l'année, ce qui complique la tache de classification parce que, l'algorithme de supervision va majoritairement prédire qu'une personne gagne moins de 50K\\$ que plus. 

In [None]:
AdultCensusIncome.groupBy(col('income')).agg(count('*').alias('Target')).show()

# statistiques descriptive  des données

In [None]:
df=AdultCensusIncome.toPandas()

In [None]:
df.describe()

In [None]:
df.hist(figsize=(20,20))

# Salaire par rapport a la classe de travail.

In [None]:
plt.figure(figsize=(10,5))
sns.countplot(x='workClass', hue='income', data=df)
plt.tight_layout()
plt.show()

# Salaire par rapport aux type de diplome

In [None]:
plt.figure(figsize=(20,10))
sns.countplot(x='education', hue='income', data=df)
plt.tight_layout()
plt.show()

# Salaire par heur de travaille par semaine

In [None]:
plt.figure(figsize=(10,5))
sns.violinplot(x='income',y='hours-per-week',hue='income',data=df)

# Salaire par rapport au sexe

In [None]:
plt.figure(figsize=(10,5))
sns.countplot(x='sex', hue='income', data=df)
plt.tight_layout()
plt.show()

# Salaire par rapport a l'occupation

In [None]:
plt.figure(figsize=(20,10))
sns.countplot(x='occupation', hue='income', data=df)
plt.tight_layout()
plt.show()

# Salaire par rapport a la race

In [None]:
plt.figure(figsize=(20,10))
sns.countplot(x='race', hue='income', data=df)
plt.tight_layout()
plt.show()

# Salaire par rapport a l'age

In [None]:
plt.figure(figsize=(10,5))
sns.violinplot(x='income',y='age',hue='income',data=df)

# Transformation des données

In [None]:
categorical_columns=[col_name for col_name,type_name in AdultCensusIncome.dtypes if type_name!= 'int' and col_name!='income']
numerical_columns=[col_name for col_name,type_name in AdultCensusIncome.dtypes if type_name=='int']
categorical_columns_indexed=[c+'_indexed' for c in categorical_columns ]

In [None]:
stringIndexerIncome=StringIndexer(inputCol='income',outputCol='income_indexed')
stringIndexer=StringIndexer(inputCols=categorical_columns,outputCols=categorical_columns_indexed)
oneHotIndexer=OneHotEncoder(inputCols=categorical_columns_indexed,outputCols=[c+'_encoded' for c in categorical_columns_indexed])
vectAssemble=VectorAssembler(inputCols=numerical_columns+oneHotIndexer.getOutputCols(),outputCol='features')

# Learn First Model

# Logistic Regression

In [None]:
(trainingData, testData) = AdultCensusIncome.randomSplit([0.7, 0.3])
lr = LogisticRegression(featuresCol='features', labelCol="income_indexed")
piple=Pipeline(stages=[stringIndexerIncome,stringIndexer,oneHotIndexer,vectAssemble,lr])
model = piple.fit(trainingData)
predictions = model.transform(testData)

In [None]:
evaluator=BinaryClassificationEvaluator(labelCol='income_indexed')
print(evaluator.evaluate(predictions))

In [None]:
tp = float(predictions.filter("prediction == 1.0 AND income_indexed == 1.0").count())
fp = float(predictions.filter("prediction == 1.0 AND income_indexed == 0.0").count())
tn = float(predictions.filter("prediction == 0.0 AND income_indexed == 0.0").count())
fn = float(predictions.filter("prediction == 0.0 AND income_indexed == 1.0").count())
Accuracy=(tp+tn)/(tp+fp+tn+fn)
Precision= tp / (tp + fp)
Recall=tp / (tp + fn)
fmesure=2 * (Precision * Recall) / (Precision + Recall)

metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy",Accuracy),
 ("Precision", Precision),
 ("Recall", Recall), 
 ("f-mesure",fmesure)],["metric", "value"])
metrics.show()

# Random Forrest

In [None]:
(trainingData, testData) = AdultCensusIncome.randomSplit([0.7, 0.3])
rf = RandomForestClassifier(featuresCol="features", labelCol="income_indexed")
piple=Pipeline(stages=[stringIndexerIncome,stringIndexer,oneHotIndexer,vectAssemble,rf])
model = piple.fit(trainingData)
predictions = model.transform(testData)

In [None]:
evaluator=BinaryClassificationEvaluator(labelCol='income_indexed')
print(evaluator.evaluate(predictions))

In [None]:
tp = float(predictions.filter("prediction == 1.0 AND income_indexed == 1.0").count())
fp = float(predictions.filter("prediction == 1.0 AND income_indexed == 0.0").count())
tn = float(predictions.filter("prediction == 0.0 AND income_indexed == 0.0").count())
fn = float(predictions.filter("prediction == 0.0 AND income_indexed == 1.0").count())
Accuracy=(tp+tn)/(tp+fp+tn+fn)
Precision= tp / (tp + fp)
Recall=tp / (tp + fn)
fmesure=2 * (Precision * Recall) / (Precision + Recall)

metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy",Accuracy),
 ("Precision", Precision),
 ("Recall", Recall), 
 ("f-mesure",fmesure)],["metric", "value"])
metrics.show()

# Linear SVM

In [None]:
(trainingData, testData) = AdultCensusIncome.randomSplit([0.7, 0.3])
ls= LinearSVC(featuresCol="features", labelCol="income_indexed")
piple=Pipeline(stages=[stringIndexerIncome,stringIndexer,oneHotIndexer,vectAssemble,ls])
model = piple.fit(trainingData)
predictions = model.transform(testData)

In [None]:
evaluator=BinaryClassificationEvaluator(labelCol='income_indexed')
print(evaluator.evaluate(predictions))

In [None]:
tp = float(predictions.filter("prediction == 1.0 AND income_indexed == 1.0").count())
fp = float(predictions.filter("prediction == 1.0 AND income_indexed == 0.0").count())
tn = float(predictions.filter("prediction == 0.0 AND income_indexed == 0.0").count())
fn = float(predictions.filter("prediction == 0.0 AND income_indexed == 1.0").count())
Accuracy=(tp+tn)/(tp+fp+tn+fn)
Precision= tp / (tp + fp)
Recall=tp / (tp + fn)
fmesure=2 * (Precision * Recall) / (Precision + Recall)

metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy",Accuracy),
 ("Precision", Precision),
 ("Recall", Recall), 
 ("f-mesure",fmesure)],["metric", "value"])
metrics.show()

# GBTClassifier

In [None]:
(trainingData, testData) = AdultCensusIncome.randomSplit([0.7, 0.3])
gbt= GBTClassifier(featuresCol="features", labelCol="income_indexed")
piple=Pipeline(stages=[stringIndexerIncome,stringIndexer,oneHotIndexer,vectAssemble,gbt])
model = piple.fit(trainingData)
predictions = model.transform(testData)

In [None]:
predictions.select('features').show()

In [None]:
evaluator=BinaryClassificationEvaluator(labelCol='income_indexed')
print(evaluator.evaluate(predictions))

In [None]:
tp = float(predictions.filter("prediction == 1.0 AND income_indexed == 1.0").count())
fp = float(predictions.filter("prediction == 1.0 AND income_indexed == 0.0").count())
tn = float(predictions.filter("prediction == 0.0 AND income_indexed == 0.0").count())
fn = float(predictions.filter("prediction == 0.0 AND income_indexed == 1.0").count())
Accuracy=(tp+tn)/(tp+fp+tn+fn)
Precision= tp / (tp + fp)
Recall=tp / (tp + fn)
fmesure=2 * (Precision * Recall) / (Precision + Recall)

metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy",Accuracy),
 ("Precision", Precision),
 ("Recall", Recall), 
 ("f-mesure",fmesure)],["metric", "value"])
metrics.show()

# Amelioration et tuning

In [None]:
gbt= GBTClassifier(featuresCol="features", labelCol="label")
piple=Pipeline(stages=[stringIndexerIncome,stringIndexer,oneHotIndexer,vectAssemble])
data = piple.fit(AdultCensusIncome).transform(AdultCensusIncome)
data=data.select('features', col('income_indexed').alias('label'))
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [None]:
grid=ParamGridBuilder()\
                .addGrid(gbt.maxDepth, [d for d in range(6,10,2)])\
                .addGrid(gbt.maxBins, [d for d in range(30,36,3)])\
                .addGrid(gbt.maxIter, [d for d in range(20,60,20)]).build()

In [None]:
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=grid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

cvModel = crossval.fit(trainingData)

In [None]:
predictions = cvModel.transform(testData)

In [None]:
evaluator=BinaryClassificationEvaluator(labelCol='label')
print(evaluator.evaluate(predictions))

In [None]:
tp = float(predictions.filter("prediction == 1.0 AND income_indexed == 1.0").count())
fp = float(predictions.filter("prediction == 1.0 AND income_indexed == 0.0").count())
tn = float(predictions.filter("prediction == 0.0 AND income_indexed == 0.0").count())
fn = float(predictions.filter("prediction == 0.0 AND income_indexed == 1.0").count())
Accuracy=(tp+tn)/(tp+fp+tn+fn)
Precision= tp / (tp + fp)
Recall=tp / (tp + fn)
fmesure=2 * (Precision * Recall) / (Precision + Recall)

metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy",Accuracy),
 ("Precision", Precision),
 ("Recall", Recall), 
 ("f-mesure",fmesure)],["metric", "value"])
metrics.show()

# with Standardization

In [None]:
scaler = StandardScaler(inputCol='features', outputCol='features_standard',withStd=True, withMean=True)


(trainingData, testData) = AdultCensusIncome.randomSplit([0.7, 0.3])
gbt= GBTClassifier(featuresCol='features_standard', labelCol="income_indexed")
piple=Pipeline(stages=[stringIndexerIncome,stringIndexer,oneHotIndexer,vectAssemble,scaler,gbt])
model = piple.fit(trainingData)
predictions = model.transform(testData)

In [None]:
evaluator=BinaryClassificationEvaluator(labelCol="income_indexed")
print(evaluator.evaluate(predictions))

In [None]:
tp = float(predictions.filter("prediction == 1.0 AND income_indexed == 1.0").count())
fp = float(predictions.filter("prediction == 1.0 AND income_indexed == 0.0").count())
tn = float(predictions.filter("prediction == 0.0 AND income_indexed == 0.0").count())
fn = float(predictions.filter("prediction == 0.0 AND income_indexed == 1.0").count())
Accuracy=(tp+tn)/(tp+fp+tn+fn)
Precision= tp / (tp + fp)
Recall=tp / (tp + fn)
fmesure=2 * (Precision * Recall) / (Precision + Recall)

metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy",Accuracy),
 ("Precision", Precision),
 ("Recall", Recall), 
 ("f-mesure",fmesure)],["metric", "value"])
metrics.show()

# Oversampling

In [9]:
AdultCensusIncome.select('income').show()

+------+
|income|
+------+
| <=50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
|  >50K|
|  >50K|
|  >50K|
|  >50K|
|  >50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
|  >50K|
|  >50K|
+------+
only showing top 20 rows

