In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CoursBigData").getOrCreate()
df = spark.createDataFrame([("Apprenant", 1)], ["Statut", "OK"])
df.show()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/20 10:09:10 WARN Utils: Your hostname, codespaces-e1abfc, resolves to a loopback address: 127.0.0.1; using 10.0.3.138 instead (on interface eth0)
26/02/20 10:09:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/20 10:09:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---------+---+
|   Statut| OK|
+---------+---+
|Apprenant|  1|
+---------+---+



In [2]:
import pyspark
from pyspark.sql import SparkSession

# 1. Création de la session (C'est ici que ça plante d'habitude en local !)
spark = SparkSession.builder \
    .appName("TestCoursBigData") \
    .getOrCreate()

# 2. Création de données factices
data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)]
df = spark.createDataFrame(data, ["Nom", "Age"])

# 3. Une petite transformation
df.filter(df.Age > 30).show()

print("✅ Si vous voyez le tableau ci-dessus, Spark est opérationnel !")

26/02/20 10:09:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-----+---+
|  Nom|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
+-----+---+

✅ Si vous voyez le tableau ci-dessus, Spark est opérationnel !


In [3]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# 1. Préparation des données (Format Spark ML : Label + Features en vecteur)
data = spark.createDataFrame([
    (100.0, 2.0, 15.0),
    (200.0, 3.0, 25.0),
    (300.0, 5.0, 35.0)], 
    ["label", "feature1", "feature2"])

# Spark ML a besoin que toutes les colonnes d'entrée soient regroupées dans un seul vecteur
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
training_data = assembler.transform(data)

# 2. Entraînement du modèle
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(training_data)

# 3. Résultat
print(f"Coefficients : {lr_model.coefficients}")
print(f"Interception : {lr_model.intercept}")

26/02/20 10:11:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

Coefficients : [1.1733726232939785,9.787407977673896]
Interception : -48.59644151949395


In [2]:
import os
import urllib.request
import numpy as np

# Téléchargement d'un extrait MNIST train en CSV depuis une source stable
if not os.path.exists("mnist_train.csv"):
    npz_url = "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz"
    npz_path = "mnist.npz"

    print("Téléchargement de MNIST... (un peu de patience)")
    urllib.request.urlretrieve(npz_url, npz_path)

    with np.load(npz_path) as data:
        x_train = data["x_train"].reshape(-1, 28 * 28)
        y_train = data["y_train"].reshape(-1, 1)

    mnist_train = np.hstack([y_train, x_train])
    np.savetxt("mnist_train.csv", mnist_train, fmt="%d", delimiter=",")

    os.remove(npz_path)
    print("Fini !")
else:
    print("mnist_train.csv existe déjà.")

Téléchargement de MNIST... (un peu de patience)
Fini !


In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Assure une session Spark active même si les cellules précédentes n'ont pas été exécutées
spark = SparkSession.builder.appName("MNIST_Preparation").getOrCreate()

# Chargement du CSV
# On suppose que la colonne 0 est le "label" et les autres les pixels
raw_data = spark.read.csv("mnist_train.csv", header=False, inferSchema=True)

# Renommer la première colonne en 'label'
cols = raw_data.columns
data = raw_data.withColumnRenamed(cols[0], "label")

# Regrouper les 784 colonnes de pixels (features) dans un seul vecteur
assembler = VectorAssembler(inputCols=cols[1:], outputCol="features")
final_data = assembler.transform(data).select("label", "features")

# Split Train/Test
(train_data, test_data) = final_data.randomSplit([0.8, 0.2], seed=42)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/20 14:04:34 WARN Utils: Your hostname, codespaces-e1abfc, resolves to a loopback address: 127.0.0.1; using 10.0.0.88 instead (on interface eth0)
26/02/20 14:04:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/20 14:04:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [5]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Création du modèle
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_data)

# Prédictions
dt_predictions = dt_model.transform(test_data)

# Évaluation
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(dt_predictions)
print(f"Précision Arbre de Décision : {accuracy:.2%}")

26/02/20 14:05:52 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 19:>                                                         (0 + 2) / 2]

Précision Arbre de Décision : 67.17%


                                                                                

In [6]:
from pyspark.ml.classification import RandomForestClassifier

# Création du modèle Random Forest
# numTrees=20 est un bon compromis pour le Codespace
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20)
rf_model = rf.fit(train_data)

# Prédictions
rf_predictions = rf_model.transform(test_data)

# Évaluation
rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"Précision Random Forest : {rf_accuracy:.2%}")

26/02/20 14:08:17 WARN DAGScheduler: Broadcasting large task binary with size 1051.3 KiB
[Stage 38:>                                                         (0 + 2) / 2]

Précision Random Forest : 83.35%


                                                                                