In [1]:
# Initialisation des librairies pyspark

# Initialisation de Spark
import pyspark

from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.sql.types import FloatType
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint
#import pandas as pd

from pyspark import SparkConf, SparkContext, SQLContext

import findspark
findspark.init()

spark = SparkSession \
    .builder \
    .appName("MNIST Classifier") \
    .config('spark.sql.warehouse.dir', 'heart.csv') \
    .config('spark.executor.instances', 10) \
    .getOrCreate()


In [2]:
# Lecture du fichier mnist (partie train)


fileNameTrain = 'heart800.csv'
mnist_train = spark.read.csv(fileNameTrain, header=True)

# Lecture du fichier mnist (partie test)
fileNameTest = 'heart226.csv'
mnist_test = spark.read.csv(fileNameTest, header=True)

In [3]:
numeric_columns = ['ages', 'sex', 'cp', 'trestbps', 'chol', 'fbs', 'restecg', 'thalach', 'exang', 'oldpeak', 'slope', 'ca', 'thal', 'target']
for col_name in numeric_columns:
    mnist_train = mnist_train.withColumn(col_name, col(col_name).cast(FloatType()))


#afficher le schema
mnist_train.printSchema()




root
 |-- ages: float (nullable = true)
 |-- sex: float (nullable = true)
 |-- cp: float (nullable = true)
 |-- trestbps: float (nullable = true)
 |-- chol: float (nullable = true)
 |-- fbs: float (nullable = true)
 |-- restecg: float (nullable = true)
 |-- thalach: float (nullable = true)
 |-- exang: float (nullable = true)
 |-- oldpeak: float (nullable = true)
 |-- slope: float (nullable = true)
 |-- ca: float (nullable = true)
 |-- thal: float (nullable = true)
 |-- target: float (nullable = true)



In [4]:
# Liste des colonnes à convertir en types numériques
numeric_columns_test = ['ages', 'sex', 'cp', 'trestbps', 'chol', 'fbs', 'restecg', 'thalach', 'exang', 'oldpeak', 'slope', 'ca', 'thal', 'target']

# Convertir les colonnes en types numériques
for col_name in numeric_columns_test:
    mnist_test = mnist_test.withColumn(col_name, col(col_name).cast(FloatType()))

# Afficher le nouveau schéma
print("Schema of mnist_test after conversion:")
mnist_test.printSchema()

Schema of mnist_test after conversion:
root
 |-- ages: float (nullable = true)
 |-- sex: float (nullable = true)
 |-- cp: float (nullable = true)
 |-- trestbps: float (nullable = true)
 |-- chol: float (nullable = true)
 |-- fbs: float (nullable = true)
 |-- restecg: float (nullable = true)
 |-- thalach: float (nullable = true)
 |-- exang: float (nullable = true)
 |-- oldpeak: float (nullable = true)
 |-- slope: float (nullable = true)
 |-- ca: float (nullable = true)
 |-- thal: float (nullable = true)
 |-- target: float (nullable = true)



In [5]:
from pyspark.ml.feature import VectorAssembler

# Liste des colonnes à assembler (toutes sauf "target")
input_cols = [col_name for col_name in mnist_train.columns if col_name != 'target']

# Création du VectorAssembler
assembler = VectorAssembler(
    inputCols=input_cols,
    outputCol="features"
)

# Transformation sur le jeu de données d'entraînement
labeledPoints = assembler.transform(mnist_train)

# Création des colonnes "label" et "features" pour le jeu de données d'entraînement
labeledPoints = labeledPoints.select([ 'target', 'features'])

# Transformation sur le jeu de données de test
labeledPointsTest = assembler.transform(mnist_test)

# Création des colonnes "label" et "features" pour le jeu de données de test
labeledPointsTest = labeledPointsTest.select([ 'target', 'features'])

# Afficher les résultats
print("Labeled Points (Training Set):")
labeledPoints.show(truncate=False)

print("Labeled Points Test (Test Set):")
labeledPointsTest.show(truncate=False)


Labeled Points (Training Set):
+------+---------------------------------------------------------------------------+
|target|features                                                                   |
+------+---------------------------------------------------------------------------+
|0.0   |[52.0,1.0,0.0,125.0,212.0,0.0,1.0,168.0,0.0,1.0,2.0,2.0,3.0]               |
|0.0   |[53.0,1.0,0.0,140.0,203.0,1.0,0.0,155.0,1.0,3.0999999046325684,0.0,0.0,3.0]|
|0.0   |[70.0,1.0,0.0,145.0,174.0,0.0,1.0,125.0,1.0,2.5999999046325684,0.0,0.0,3.0]|
|0.0   |[61.0,1.0,0.0,148.0,203.0,0.0,1.0,161.0,0.0,0.0,2.0,1.0,3.0]               |
|0.0   |[62.0,0.0,0.0,138.0,294.0,1.0,1.0,106.0,0.0,1.899999976158142,1.0,3.0,2.0] |
|1.0   |(13,[0,3,4,7,9,10,12],[58.0,100.0,248.0,122.0,1.0,1.0,2.0])                |
|0.0   |[58.0,1.0,0.0,114.0,318.0,0.0,2.0,140.0,0.0,4.400000095367432,0.0,3.0,1.0] |
|0.0   |[55.0,1.0,0.0,160.0,289.0,0.0,0.0,145.0,1.0,0.800000011920929,1.0,1.0,3.0] |
|0.0   |[46.0,1.0,0.0,120.0,249.0,

In [6]:
labeledPoints.printSchema()
labeledPoints.show()

root
 |-- target: float (nullable = true)
 |-- features: vector (nullable = true)

+------+--------------------+
|target|            features|
+------+--------------------+
|   0.0|[52.0,1.0,0.0,125...|
|   0.0|[53.0,1.0,0.0,140...|
|   0.0|[70.0,1.0,0.0,145...|
|   0.0|[61.0,1.0,0.0,148...|
|   0.0|[62.0,0.0,0.0,138...|
|   1.0|(13,[0,3,4,7,9,10...|
|   0.0|[58.0,1.0,0.0,114...|
|   0.0|[55.0,1.0,0.0,160...|
|   0.0|[46.0,1.0,0.0,120...|
|   0.0|[54.0,1.0,0.0,122...|
|   1.0|[71.0,0.0,0.0,112...|
|   0.0|[43.0,0.0,0.0,132...|
|   1.0|[34.0,0.0,1.0,118...|
|   0.0|[51.0,1.0,0.0,140...|
|   0.0|[52.0,1.0,0.0,128...|
|   1.0|[34.0,0.0,1.0,118...|
|   1.0|[51.0,0.0,2.0,140...|
|   0.0|[54.0,1.0,0.0,124...|
|   1.0|[50.0,0.0,1.0,120...|
|   1.0|[58.0,1.0,2.0,140...|
+------+--------------------+
only showing top 20 rows



In [7]:
from pyspark.sql.functions import when

# Liste des colonnes à assembler (toutes sauf "target")
input_cols = [col_name for col_name in mnist_train.columns if col_name != 'target']

# Création du VectorAssembler
assembler1 = VectorAssembler(
    inputCols=input_cols,
    outputCol="features"
)

# Transformation sur le jeu de données d'entraînement
labeledPoints = assembler1.transform(mnist_train)

# Création de la colonne "labelIndex"
labeledPoints = labeledPoints.withColumn("labelIndex", when(col("target") == 0.0, 0).otherwise(1))

# Création des colonnes "target", "features" et "labelIndex" pour le jeu de données d'entraînement
labeledPoints = labeledPoints.select(['target', 'features', 'labelIndex'])

# Transformation sur le jeu de données de test
labeledPointsTest = assembler1.transform(mnist_test)

# Création de la colonne "labelIndex" pour le jeu de données de test
labeledPointsTest = labeledPointsTest.withColumn("labelIndex", when(col("target") == 0.0, 0).otherwise(1))

# Création des colonnes "target", "features" et "labelIndex" pour le jeu de données de test
labeledPointsTest = labeledPointsTest.select(['target', 'features', 'labelIndex'])

# Afficher les résultats
print("Labeled Points (Training Set):")
labeledPoints.show(truncate=False)

print("Labeled Points Test (Test Set):")
labeledPointsTest.show(truncate=False)


Labeled Points (Training Set):
+------+---------------------------------------------------------------------------+----------+
|target|features                                                                   |labelIndex|
+------+---------------------------------------------------------------------------+----------+
|0.0   |[52.0,1.0,0.0,125.0,212.0,0.0,1.0,168.0,0.0,1.0,2.0,2.0,3.0]               |0         |
|0.0   |[53.0,1.0,0.0,140.0,203.0,1.0,0.0,155.0,1.0,3.0999999046325684,0.0,0.0,3.0]|0         |
|0.0   |[70.0,1.0,0.0,145.0,174.0,0.0,1.0,125.0,1.0,2.5999999046325684,0.0,0.0,3.0]|0         |
|0.0   |[61.0,1.0,0.0,148.0,203.0,0.0,1.0,161.0,0.0,0.0,2.0,1.0,3.0]               |0         |
|0.0   |[62.0,0.0,0.0,138.0,294.0,1.0,1.0,106.0,0.0,1.899999976158142,1.0,3.0,2.0] |0         |
|1.0   |(13,[0,3,4,7,9,10,12],[58.0,100.0,248.0,122.0,1.0,1.0,2.0])                |1         |
|0.0   |[58.0,1.0,0.0,114.0,318.0,0.0,2.0,140.0,0.0,4.400000095367432,0.0,3.0,1.0] |0         |
|0.0   |[

In [8]:
print(labeledPoints.head())

Row(target=0.0, features=DenseVector([52.0, 1.0, 0.0, 125.0, 212.0, 0.0, 1.0, 168.0, 0.0, 1.0, 2.0, 2.0, 3.0]), labelIndex=0)


In [10]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml import Pipeline

# Définition des classifieurs
classifiers = [
    RandomForestClassifier(labelCol="labelIndex", featuresCol="features", impurity='gini', maxBins=32),
    LogisticRegression(labelCol="labelIndex", featuresCol="features", maxIter=10, regParam=0.01)
]

# Utilisation d'un classifieur spécifique (Logistic Regression dans cet exemple)
classifier1 = LogisticRegression(labelCol="labelIndex", featuresCol="features", maxIter=10, regParam=0.01)

# Utilisation de la pipeline pour entraîner le classifieur
pipeline1 = Pipeline(stages=[classifier1])



model1= pipeline1.fit(labeledPoints)


# Utilisation du modèle pour prédire sur le jeu de données de test
predictions1 = model1.transform(labeledPointsTest)


# Affichage des résultats
predictions1.select("target", "prediction", "probability").show(truncate=False)



+------+----------+-----------------------------------------+
|target|prediction|probability                              |
+------+----------+-----------------------------------------+
|1.0   |1.0       |[0.3964653209487039,0.6035346790512961]  |
|0.0   |0.0       |[0.993737574696525,0.006262425303474983] |
|1.0   |1.0       |[0.21395375505566033,0.7860462449443397] |
|0.0   |1.0       |[0.30852682276643284,0.6914731772335672] |
|0.0   |1.0       |[0.34025853074659984,0.6597414692534002] |
|1.0   |1.0       |[0.19881077542777215,0.8011892245722279] |
|1.0   |0.0       |[0.9027991070450547,0.09720089295494527] |
|0.0   |0.0       |[0.9050606235793398,0.0949393764206602]  |
|1.0   |1.0       |[0.1121250493906999,0.8878749506093001]  |
|1.0   |1.0       |[0.3599667783562478,0.6400332216437522]  |
|1.0   |1.0       |[0.034128315062532395,0.9658716849374676]|
|0.0   |0.0       |[0.952211646320816,0.04778835367918399]  |
|0.0   |1.0       |[0.3899118070653097,0.6100881929346903]  |
|0.0   |

In [11]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml import Pipeline

# Définition des classifieurs
classifiers = [
    RandomForestClassifier(labelCol="labelIndex", featuresCol="features", impurity='gini', maxBins=32),
    LogisticRegression(labelCol="labelIndex", featuresCol="features", maxIter=10, regParam=0.01)
]
classifier2 = RandomForestClassifier(labelCol="labelIndex", featuresCol="features", impurity='gini', maxBins=32)
pipeline2 =Pipeline(stages=[classifier2])
model2= pipeline2.fit(labeledPoints)
predictions2 = model2.transform(labeledPointsTest)
predictions2.select("target", "prediction", "probability").show(truncate=False)

+------+----------+----------------------------------------+
|target|prediction|probability                             |
+------+----------+----------------------------------------+
|1.0   |1.0       |[0.19776177658418473,0.8022382234158153]|
|0.0   |0.0       |[0.9092364162064346,0.09076358379356539]|
|1.0   |1.0       |[0.2353210042318515,0.7646789957681486] |
|0.0   |1.0       |[0.47898255634096926,0.5210174436590307]|
|0.0   |0.0       |[0.5425442965937375,0.4574557034062625] |
|1.0   |1.0       |[0.2017132952500681,0.7982867047499319] |
|1.0   |0.0       |[0.6450940697437775,0.35490593025622247]|
|0.0   |0.0       |[0.9429006210908591,0.05709937890914095]|
|1.0   |1.0       |[0.10349577070822931,0.8965042292917706]|
|1.0   |1.0       |[0.23509559685724674,0.7649044031427533]|
|1.0   |1.0       |[0.08373830567449896,0.916261694325501] |
|0.0   |0.0       |[0.9456514709711898,0.05434852902881008]|
|0.0   |1.0       |[0.42060091031993957,0.5793990896800604]|
|0.0   |0.0       |[0.75

In [12]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Définition des classifieurs
gbt = GBTClassifier(labelCol="labelIndex", featuresCol="features", maxDepth=5, maxBins=32, maxIter=10)

# Création du pipeline avec XGBoost
pipeline_xgboost = Pipeline(stages=[
    gbt
])

# Entraînement du modèle
model_xgboost = pipeline_xgboost.fit(labeledPoints)

# Prédictions sur l'ensemble de test
predictions_xgboost = model_xgboost.transform(labeledPointsTest)

# Affichage des résultats
predictions_xgboost.select("target", "prediction", "probability").show(truncate=False)


+------+----------+----------------------------------------+
|target|prediction|probability                             |
+------+----------+----------------------------------------+
|1.0   |1.0       |[0.09085915055370573,0.9091408494462943]|
|0.0   |0.0       |[0.9354631521256775,0.06453684787432246]|
|1.0   |1.0       |[0.14266500952890523,0.8573349904710947]|
|0.0   |0.0       |[0.7640411747336352,0.2359588252663648] |
|0.0   |0.0       |[0.9064476959940395,0.09355230400596048]|
|1.0   |1.0       |[0.10203035191717273,0.8979696480828273]|
|1.0   |1.0       |[0.265716372628513,0.734283627371487]   |
|0.0   |0.0       |[0.9346020546165054,0.06539794538349464]|
|1.0   |1.0       |[0.05792650586679596,0.9420734941332041]|
|1.0   |1.0       |[0.10302216708110305,0.896977832918897] |
|1.0   |1.0       |[0.06335741157406234,0.9366425884259376]|
|0.0   |0.0       |[0.9300483363838002,0.06995166361619976]|
|0.0   |0.0       |[0.8497769438988433,0.15022305610115672]|
|0.0   |0.0       |[0.92

In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictionsAndLabels_xgboost=predictions_xgboost.select("labelIndex", "prediction")
# Évaluateur d'accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_accuracy.evaluate(predictionsAndLabels_xgboost)
print("Test set accuracy = " + str(accuracy))

# Évaluateur de F1
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictionsAndLabels_xgboost)
print("Test set f1 = " + str(f1))

# Évaluateur de weightedPrecision
evaluator_precision = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictionsAndLabels_xgboost)
print("Test set weightedPrecision = " + str(precision))

# Évaluateur de weightedRecall
evaluator_recall = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictionsAndLabels_xgboost)
print("Test set weightedRecall = " + str(recall))


Test set accuracy = 0.9424778761061947
Test set f1 = 0.9424271719951054
Test set weightedPrecision = 0.9453769410586075
Test set weightedRecall = 0.9424778761061947


In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline


# Sélection des colonnes pertinentes pour l'évaluation
predictionAndLabels_1 = predictions1.select("labelIndex", "prediction")

# Évaluation
evaluator_lr = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")
accuracy_lr = evaluator_lr.evaluate(predictionAndLabels_1)

print("RandomForest Test set accuracy = " + str(accuracy_lr))


# Évaluateur de weightedPrecision
evaluator_precision = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictionAndLabels_1)
print("Test set weightedPrecision = " + str(precision))


RandomForest Test set accuracy = 0.8008849557522124
Test set weightedPrecision = 0.8040996161099889


In [17]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline


# Sélection des colonnes pertinentes pour l'évaluation
predictionAndLabels_2 = predictions2.select("labelIndex", "prediction")

# Évaluation
evaluator_rf = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")
accuracy_rf = evaluator_rf.evaluate(predictionAndLabels_2)

print("RandomForest Test set accuracy = " + str(accuracy_rf))


# Évaluateur de weightedPrecision
evaluator_precision = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictionAndLabels_2)
print("Test set weightedPrecision = " + str(precision))


RandomForest Test set accuracy = 0.8584070796460177
Test set weightedPrecision = 0.8595520470977158


In [23]:
from pyspark.sql import SparkSession
from pyspark_dist_explore import hist

# Créer une session Spark
spark = SparkSession.builder.appName("Example").getOrCreate()

# Charger le fichier CSV dans un DataFrame PySpark
file_path = 'heart.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Utiliser pyspark_dist_explore pour créer un histogramme
hist_data = hist(df, bins=20, color='skyblue', edgecolor='black')

# Extraire les données pour Matplotlib
bins = hist_data['bin_edges'][0]
freq = hist_data['bin_values'][0]

# Tracer l'histogramme avec Matplotlib
plt.bar(bins[:-1], freq, width=bins[1]-bins[0], color='skyblue', edgecolor='black')
plt.title('Répartition de l\'âge en fonction de la maladie cardiaque')
plt.xlabel('Âge')
plt.ylabel('Fréquence')
plt.show()


TypeError: hist() missing 1 required positional argument: 'x'