# Initialization

In [None]:
import os
import sys
import pyspark
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns


from pyspark.sql import SparkSession


from pyspark.ml import Pipeline

from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator



In [2]:
print(sys.executable)    # vérification
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

print(pyspark.__version__)

/Users/gaelleba/anaconda3/envs/sparkEnv/bin/python
3.5.4


# Start Spark session and open pre-treated dataset

In [3]:
# create a spark session
spark = SparkSession \
.builder \
.master("local") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.config("spark.memory.fraction", "0.6") \
.config("spark.memory.storageFraction", "0.3") \
.config("spark.executor.cores", "4") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.driver.extraJavaOptions", "-XX:ReservedCodeCacheSize=512m") \
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/10 22:38:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
folder_models="binary_models_sample"
df_name='df_BioBertFull_binary_classif'

In [5]:
# Load the pre-treated dataset
df_classif = spark.read.parquet(f"{folder_models}/{df_name}")
df_classif.printSchema()
print("Features vector size:", df_classif.select("features").head()[0].size)

                                                                                

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)

Features vector size: 801


                                                                                

In [6]:
# Partitionner les données en apprentissage (80%) et test (20%)
partitions = df_classif.randomSplit([0.8, 0.2], seed=100)
train = partitions[0].cache()  # conservé en mémoire
test = partitions[1]

In [None]:
train.write.mode("overwrite").parquet(f"{folder_models}/train_full")
test.write.mode("overwrite").parquet(f"{folder_models}/test_full")

# Binary classification

## SVM

In [8]:
# Définition de l'estimateur SVC linéaire
linSvc = LinearSVC()\
                .setFeaturesCol("features") \
                .setLabelCol("label")\

# Définition du pipeline (réduit ici au SVM linéaire)
pipeline_svc = Pipeline().setStages([linSvc])

In [None]:
# Définition des hyperparamètres
paramGrid_svc = ParamGridBuilder().addGrid(linSvc.regParam,[0.1, 0.3, 0.5, 0.6, 0.8]) \
                              .addGrid(linSvc.maxIter, [5, 10, 15, 20])\
                              .build()

# Définition de l'instance de CrossValidator : à quel estimateur l'appliquer,
cv_svc = CrossValidator().setEstimator(pipeline_svc) \
                     .setEstimatorParamMaps(paramGrid_svc) \
                     .setNumFolds(5) \
                     .setEvaluator(BinaryClassificationEvaluator())

# Construction et évaluation par validation croisée des modèles correspondant
cvSVCmodel = cv_svc.fit(train)

25/02/10 13:33:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


CodeCache: size=524288Kb used=36058Kb max_used=37136Kb free=488229Kb
 bounds [0x000000010a1e8000, 0x000000010c658000, 0x000000012a1e8000]
 total_blobs=13366 nmethods=12138 adapters=1139
 compilation: disabled (not enough contiguous free space left)


In [10]:
# Afficher les valeurs AUC obtenues pour les combinaisons de la grille
print(cvSVCmodel.avgMetrics)

# Afficher les meilleures valeurs pour les hyperparamètres
cvSVCmodel.getEstimatorParamMaps()[cvSVCmodel.avgMetrics.index(max(cvSVCmodel.avgMetrics))]

[0.5608073224103309, 0.5382806618555639, 0.5314840459206902, 0.5317214715305948, 0.5626628585671897, 0.5413881176502368, 0.5387794508842225, 0.5393233279872522, 0.5645334961674022, 0.548569065557419, 0.5419800698183377, 0.5427868960093243, 0.565154488042098, 0.5484751434316775, 0.5421613771118905, 0.5442804801611161, 0.5655526773272224, 0.5494098973152837, 0.5432718481154601, 0.5470454816143779]


{Param(parent='LinearSVC_fe8817195cd6', name='regParam', doc='regularization parameter (>= 0).'): 0.8,
 Param(parent='LinearSVC_fe8817195cd6', name='maxIter', doc='max number of iterations (>= 0).'): 5}

In [None]:
cvSVCmodel.write().overwrite().save(f"{folder_models}/cvSVCmodel_fullEmbed")

## Regression logistique

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label')
pipelineLR = Pipeline().setStages([lr])

In [13]:
# Définition des hyperparamètres
paramGrid_lr = (ParamGridBuilder()
.addGrid(lr.regParam, [0.1, 0.2, 0.3, 0.4, 0.5])
.addGrid(lr.elasticNetParam, [0, 0.01,0.1,0.2])
.addGrid(lr.maxIter, [5, 10, 25])
.build())

cv_lr = CrossValidator().setEstimator(pipelineLR) \
                           .setEstimatorParamMaps(paramGrid_lr) \
                           .setNumFolds(5) \
                           .setEvaluator(BinaryClassificationEvaluator())
                           
cvLRmodel = cv_lr.fit(train)

In [14]:
# Afficher les valeurs AUC obtenues pour les combinaisons de la grille
print(cvLRmodel.avgMetrics)

# Afficher les meilleures valeurs pour les hyperparamètres
cvLRmodel.getEstimatorParamMaps()[cvLRmodel.avgMetrics.index(max(cvLRmodel.avgMetrics))]

[0.5329373406082653, 0.5271157413533724, 0.5292993857628553, 0.5402426068020111, 0.5301460546748644, 0.5315136599422282, 0.5545420888590893, 0.5517441817886981, 0.5508703628896314, 0.55074280719047, 0.5519703821574722, 0.5518353570596639, 0.5369698342788631, 0.5343785315405081, 0.5352584010625839, 0.544783203168583, 0.5387942829657141, 0.5400132929585777, 0.5517909752757892, 0.5516797337202456, 0.5518604688890936, 0.5201829941663194, 0.5203871580909234, 0.5197779985116856, 0.5394373953567244, 0.5398499504200528, 0.5398391112686907, 0.5479597784955788, 0.545221955889667, 0.5453324526626568, 0.546764067220335, 0.5490571394634352, 0.5490001585537116, 0.5, 0.5, 0.5, 0.5420544965208451, 0.5431994476922366, 0.5433296519252148, 0.5510509252554859, 0.5493863857874037, 0.5492632710089861, 0.5202903218155246, 0.5205317231675577, 0.5204009695397522, 0.5, 0.5, 0.5, 0.5448454191371948, 0.5458578975158234, 0.54598105858535, 0.553018490178993, 0.5527544363639375, 0.5528586702968628, 0.501287242169595

{Param(parent='LogisticRegression_a2bab1ab491c', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
 Param(parent='LogisticRegression_a2bab1ab491c', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.1,
 Param(parent='LogisticRegression_a2bab1ab491c', name='maxIter', doc='max number of iterations (>= 0).'): 5}

In [None]:
cvLRmodel.write().overwrite().save(f"{folder_models}/cvLRmodel_fullEmbed")

## Random forests

In [16]:
rf = RandomForestClassifier() \
              .setFeaturesCol("features") \
              .setLabelCol("label")\
                  . setSeed(100)

pipelineRF = Pipeline().setStages([rf])

In [None]:
paramGrid_rf = ParamGridBuilder() \
                .addGrid(rf.maxDepth, [3, 5, 8]) \
                .addGrid(rf.numTrees, [100, 125, 150]) \
                .addGrid(rf.minInstancesPerNode, [2, 5, 10])\
                .addGrid(rf.featureSubsetStrategy, ["sqrt", "log2"])\
                .build()

cv_rf = CrossValidator().setEstimator(pipelineRF) \
                           .setEstimatorParamMaps(paramGrid_rf) \
                           .setNumFolds(5) \
                           .setEvaluator(BinaryClassificationEvaluator())

cvRFmodel = cv_rf.fit(train)

25/02/10 13:38:21 WARN DAGScheduler: Broadcasting large task binary with size 1013.3 KiB
25/02/10 13:38:30 WARN DAGScheduler: Broadcasting large task binary with size 1014.1 KiB
25/02/10 13:38:40 WARN DAGScheduler: Broadcasting large task binary with size 1002.4 KiB
25/02/10 13:38:50 WARN DAGScheduler: Broadcasting large task binary with size 1138.1 KiB
25/02/10 13:39:01 WARN DAGScheduler: Broadcasting large task binary with size 1141.7 KiB
25/02/10 13:39:12 WARN DAGScheduler: Broadcasting large task binary with size 1124.1 KiB
25/02/10 13:39:23 WARN DAGScheduler: Broadcasting large task binary with size 1267.0 KiB
25/02/10 13:39:24 WARN DAGScheduler: Broadcasting large task binary with size 1801.7 KiB
25/02/10 13:39:26 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
25/02/10 13:39:28 WARN DAGScheduler: Broadcasting large task binary with size 1572.2 KiB
25/02/10 13:39:32 WARN DAGScheduler: Broadcasting large task binary with size 1130.1 KiB
25/02/10 13:39:33 WARN D

In [18]:
# Afficher les valeurs AUC obtenues pour les combinaisons de la grille
print(cvRFmodel.avgMetrics)

# Afficher les meilleures valeurs pour les hyperparamètres
cvRFmodel.getEstimatorParamMaps()[cvRFmodel.avgMetrics.index(max(cvRFmodel.avgMetrics))]

[0.5500347648139722, 0.5521388348631524, 0.5536929174558083, 0.5510709374948402, 0.5489249899030673, 0.5521134109731575, 0.5528785936287142, 0.5630448396991944, 0.5535707909034012, 0.5667522456840447, 0.5511441345084963, 0.5590454902244296, 0.5504401666145131, 0.5523726298344509, 0.5556989037311107, 0.5582534488742534, 0.5465185073834954, 0.5588372516226894, 0.5533815267264097, 0.5462265417949365, 0.5667936919464799, 0.557990286675728, 0.5500443739422667, 0.5482230430518511, 0.5486364072151719, 0.5608278324458981, 0.5648105076871871, 0.5583357868076131, 0.5524310252369871, 0.5534956661734913, 0.5558644630575018, 0.5459725623935895, 0.5491651539531136, 0.5435477235902939, 0.5538586125679535, 0.565650699262749, 0.5435033951984245, 0.5386488928746799, 0.5580789346250243, 0.5609973757040618, 0.5539535333846176, 0.54580273270739, 0.543023028566396, 0.5471454553541527, 0.5597912392372655, 0.547252877619529, 0.5442898362792532, 0.5449190395153295, 0.550988646544561, 0.5413617134042294, 0.5434

{Param(parent='RandomForestClassifier_1ca1315b5a79', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
 Param(parent='RandomForestClassifier_1ca1315b5a79', name='numTrees', doc='Number of trees to train (>= 1).'): 100,
 Param(parent='RandomForestClassifier_1ca1315b5a79', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 5,
 Param(parent='RandomForestClassifier_1ca1315b5a79', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird

In [None]:
# Importances des variables pour le meilleur modèle
best_RFmodel = cvRFmodel.bestModel.stages[-1]
feature_importances = best_RFmodel.featureImportances
print(feature_importances)

(788,[3,11,17,18,19,21,22,23,24,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,46,47,48,49,51,52,53,54,55,56,57,58,59,60,61,62,63,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,201,202,203,204,205,206,207,209,210,211,212,213,214,215,216,217,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,244,245,246,247,248,249,250,251,252,253,254,255,256,257,258,260,261,262,263,264,265,266,267,268,269,270,271,272,273,274,275,276,277,279,280,281,282,283,284,285,286,287,288,289,290,292,293,294,296

In [None]:
cvRFmodel.write().overwrite().save(f"{folder_models}/cvRFmodel_fullEmbed")