In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col, when, sum , sqrt
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [4]:
# Création de la session Spark
spark = SparkSession.builder.appName("AsteroidsApp").getOrCreate()


In [5]:
# Lecture du CSV
df = spark.read.option("header", True).csv("hdfs://namenode:9000/user/hdfs/kafka_data/celestial_bodies.csv")

In [6]:
df.show(5)

+------------+--------+------------------+-----+----------+----------+----------+-----+------+------+
|          id|    type|              mass| size|         x|         y|         z|   vx|    vy|    vz|
+------------+--------+------------------+-----+----------+----------+----------+-----+------+------+
|       Earth|  planet|         5.972e+24|12742|       0.0|       0.0|       0.0|  0.0|   0.0|   0.0|
|asteroid_446|asteroid| 336886339164360.7|  0.8|-143630.08|-436397.17|-626138.25|35.65| 49.13|-21.85|
|asteroid_900|asteroid| 681347178525166.1| 1.69| 875856.09|-926660.97|-953095.68|48.64|-10.68|  5.82|
|asteroid_843|asteroid|382792374092527.25| 6.88| 576717.94| 708441.61|-406502.13|23.86|-27.71|-39.67|
|asteroid_644|asteroid| 850435931469795.2| 6.65|  448828.6|-223273.45|-408345.01|24.22|-10.28| 26.99|
+------------+--------+------------------+-----+----------+----------+----------+-----+------+------+
only showing top 5 rows



In [7]:
# Conversion des colonnes en types numériques
df = df.withColumn("mass", col("mass").cast("double")) \
       .withColumn("size", col("size").cast("double")) \
       .withColumn("x", col("x").cast("double")) \
       .withColumn("y", col("y").cast("double")) \
       .withColumn("z", col("z").cast("double")) \
       .withColumn("vx", col("vx").cast("double")) \
       .withColumn("vy", col("vy").cast("double")) \
       .withColumn("vz", col("vz").cast("double"))

In [8]:
df.show(5)

+------------+--------+--------------------+-------+----------+----------+----------+-----+------+------+
|          id|    type|                mass|   size|         x|         y|         z|   vx|    vy|    vz|
+------------+--------+--------------------+-------+----------+----------+----------+-----+------+------+
|       Earth|  planet|            5.972E24|12742.0|       0.0|       0.0|       0.0|  0.0|   0.0|   0.0|
|asteroid_446|asteroid|3.368863391643607E14|    0.8|-143630.08|-436397.17|-626138.25|35.65| 49.13|-21.85|
|asteroid_900|asteroid|6.813471785251661E14|   1.69| 875856.09|-926660.97|-953095.68|48.64|-10.68|  5.82|
|asteroid_843|asteroid|3.827923740925272...|   6.88| 576717.94| 708441.61|-406502.13|23.86|-27.71|-39.67|
|asteroid_644|asteroid|8.504359314697952E14|   6.65|  448828.6|-223273.45|-408345.01|24.22|-10.28| 26.99|
+------------+--------+--------------------+-------+----------+----------+----------+-----+------+------+
only showing top 5 rows



## Analyser le dataframe

In [9]:
df.dtypes

[('id', 'string'),
 ('type', 'string'),
 ('mass', 'double'),
 ('size', 'double'),
 ('x', 'double'),
 ('y', 'double'),
 ('z', 'double'),
 ('vx', 'double'),
 ('vy', 'double'),
 ('vz', 'double')]

In [10]:
# Compter les valeurs NULL dans toutes les colonnes
df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()

+---+----+----+----+---+---+---+---+---+---+
| id|type|mass|size|  x|  y|  z| vx| vy| vz|
+---+----+----+----+---+---+---+---+---+---+
|  0|   0|   0|   0|  0|  0|  0|  0|  0|  0|
+---+----+----+----+---+---+---+---+---+---+



## traitement du dataframe

##  Calcul de la position future de l'astéroïde après 1 an (en secondes)

In [11]:
# Temps en secondes pour 1 an
time_in_seconds = 365 * 24 * 3600 * 5

In [12]:
# Calcul de la position future pour chaque astéroïde
df_future = df.withColumn(
    "future_x", col("x") + col("vx") * time_in_seconds
).withColumn(
    "future_y", col("y") + col("vy") * time_in_seconds
).withColumn(
    "future_z", col("z") + col("vz") * time_in_seconds
)

In [13]:
# Calcul de la distance entre l'astéroïde et la Terre (position de la Terre est (0, 0, 0))
df_distance = df_future.withColumn(
    "distance_to_earth", sqrt(col("future_x")**2 + col("future_y")**2 + col("future_z")**2)
)

In [14]:
# Filtrer pour ne garder que les astéroïdes 
df_distance = df_distance.filter(col("id") != "Earth")

In [15]:
# Vérification de la collision (distance < rayon de la Terre + petit seuil pour l'astéroïde)
collision_threshold = 6371 + 1  # Rayon de la Terre (6371 km) + 1 km pour l'astéroïde

df_collision = df_distance.withColumn(
    "collision", col("distance_to_earth") < collision_threshold
)

In [16]:
# Afficher les résultats
df_collision.select("id", "future_x", "future_y", "future_z", "distance_to_earth", "collision").show(truncate=False)

+------------+--------------------+--------------------+--------------------+---------------------+---------+
|id          |future_x            |future_y            |future_z            |distance_to_earth    |collision|
+------------+--------------------+--------------------+--------------------+---------------------+---------+
|asteroid_446|5.62114836992E9     |7.74638200283E9     |-3.44593413825E9    |1.0172423763075325E10|false    |
|asteroid_900|7.67043105609E9     |-1.68494906097E9    |9.1674450432E8      |7.906641917432114E9  |false    |
|asteroid_843|3.76282151794E9     |-4.36860435839E9    |-6.25557210213E9    |8.507391617938106E9  |false    |
|asteroid_644|3.8194584286E9      |-1.62117367345E9    |4.2553748549899993E9|5.943457068388069E9  |false    |
|asteroid_781|-5.07810045905E9    |4.66550620254E9     |-4.4054393248E8     |6.910002268783128E9  |false    |
|asteroid_909|4.70634742691E9     |-3.30647671424E9    |5.40773588742E9     |7.894688201106102E9  |false    |
|asteroid_

In [17]:
df_collision = df_collision.withColumn("collision", col("collision").cast("double"))

In [18]:
df_collision.printSchema()

root
 |-- id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- mass: double (nullable = true)
 |-- size: double (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)
 |-- vx: double (nullable = true)
 |-- vy: double (nullable = true)
 |-- vz: double (nullable = true)
 |-- future_x: double (nullable = true)
 |-- future_y: double (nullable = true)
 |-- future_z: double (nullable = true)
 |-- distance_to_earth: double (nullable = true)
 |-- collision: double (nullable = true)



## Prediction 

In [19]:
# Liste des colonnes d'entrée
input_cols = ["mass", "size", "x", "y", "z", "vx", "vy", "vz", "future_x", "future_y", "future_z", "distance_to_earth"]

# Assemblage des colonnes d'entrée en une colonne "features"
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
df_assembled = assembler.transform(df_collision)

# Appliquer la normalisation (ou standardisation) avec StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

# Afficher les données avec les caractéristiques normalisées
df_scaled.select("id", "scaled_features").show(5)


+------------+--------------------+
|          id|     scaled_features|
+------------+--------------------+
|asteroid_446|[1.11594523070702...|
|asteroid_900|[2.25698120089069...|
|asteroid_843|[1.26801022944170...|
|asteroid_644|[2.81709232882425...|
|asteroid_781|[1.56235905711047...|
+------------+--------------------+
only showing top 5 rows



In [20]:
# Diviser les données en 80% entraînement et 20% test
train_df, test_df = df_scaled.randomSplit([0.8, 0.2], seed=42)

# Afficher la taille des ensembles
print(f"Ensemble d'entraînement : {train_df.count()} lignes")
print(f"Ensemble de test : {test_df.count()} lignes")


Ensemble d'entraînement : 67 lignes
Ensemble de test : 17 lignes


In [21]:
# Vérifier les types de colonnes dans le DataFrame
df_scaled.printSchema()


root
 |-- id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- mass: double (nullable = true)
 |-- size: double (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)
 |-- vx: double (nullable = true)
 |-- vy: double (nullable = true)
 |-- vz: double (nullable = true)
 |-- future_x: double (nullable = true)
 |-- future_y: double (nullable = true)
 |-- future_z: double (nullable = true)
 |-- distance_to_earth: double (nullable = true)
 |-- collision: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaled_features: vector (nullable = true)



In [22]:
# Entraîner un modèle de régression logistique
lr = LogisticRegression(featuresCol="scaled_features", labelCol="collision")

# Entraînement du modèle
lr_model = lr.fit(train_df)

# Prédictions sur l'ensemble de test
predictions = lr_model.transform(test_df)

# Afficher les prédictions
predictions.select("id", "collision", "prediction", "probability").show(5)


+------------+---------+----------+-----------+
|          id|collision|prediction|probability|
+------------+---------+----------+-----------+
|asteroid_042|      0.0|       0.0|  [1.0,0.0]|
|asteroid_081|      0.0|       0.0|  [1.0,0.0]|
|asteroid_106|      0.0|       0.0|  [1.0,0.0]|
|asteroid_163|      0.0|       0.0|  [1.0,0.0]|
|asteroid_234|      0.0|       0.0|  [1.0,0.0]|
+------------+---------+----------+-----------+
only showing top 5 rows



In [23]:
# Définir l'évaluateur pour la classification binaire
evaluator = BinaryClassificationEvaluator(labelCol="collision", rawPredictionCol="prediction")

# Calculer l'AUC (Area Under Curve)
auc = evaluator.evaluate(predictions)

# Afficher l'AUC avec le nom du modèle
print(f"AUC du modèle {lr.getOrDefault('family')} : {auc}")

AUC du modèle auto : 0.0


In [24]:
from pyspark.ml.classification import RandomForestClassifier

# Créer un modèle RandomForest
rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="collision")

# Entraîner le modèle
rf_model = rf.fit(train_df)

# Prédictions
predictions = rf_model.transform(test_df)

# Calculer l'AUC
auc = evaluator.evaluate(predictions)
print(f"AUC du modèle RandomForest : {auc}")


AUC du modèle RandomForest : 0.0
