In [None]:
# Installation de Java

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Installation du fichier Spark

!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

# Extraction a partir du fichier Zip

!tar xf spark-3.3.2-bin-hadoop3.tgz

# Installation du module Spark pour permettre d'utiliser Spark avec Python

!pip install -q findspark

# Création d'une session Spark

In [None]:
# Initialisation de l'environnement de travail 

import os
import multiprocessing as mp 
import threading as th 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

# Importation du module findspark et sparkcontext

import findspark
findspark.init()

from pyspark.sql import SparkSession
MAX_MEMORY = "5g"
spark = SparkSession \
.builder \
.appName("TP_Taibi_Toumi") \
.config("spark.executor.memory", MAX_MEMORY) \
.config("spark.driver.memory", MAX_MEMORY) \
.getOrCreate()

In [None]:
spark

In [None]:
from pyspark import SparkContext
from math import sqrt
import time
import pandas as pd
start_time = time.time()

## Création d'un dataframe sur IRIS

In [None]:
df2 = pd.read_csv('IRIS.csv')

In [None]:
print(df2)

     sepal_length  sepal_width  petal_length  petal_width         species
0             5.1          3.5           1.4          0.2     Iris-setosa
1             4.9          3.0           1.4          0.2     Iris-setosa
2             4.7          3.2           1.3          0.2     Iris-setosa
3             4.6          3.1           1.5          0.2     Iris-setosa
4             5.0          3.6           1.4          0.2     Iris-setosa
..            ...          ...           ...          ...             ...
145           6.7          3.0           5.2          2.3  Iris-virginica
146           6.3          2.5           5.0          1.9  Iris-virginica
147           6.5          3.0           5.2          2.0  Iris-virginica
148           6.2          3.4           5.4          2.3  Iris-virginica
149           5.9          3.0           5.1          1.8  Iris-virginica

[150 rows x 5 columns]


## Création d'un spark Dataframe

In [None]:
spark_df = spark.createDataFrame(df2)
# Printing Schema
spark_df.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [None]:
spark_df.show(5)

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows



# Parallélisation du travail de sparks sur les cores du processeur

In [None]:
sc = spark.sparkContext
rdd = sc.parallelize(df2.to_numpy()) # paralléliser le travail sur les cores de l'ordinateur
print("nombre de partitions : "+ str(rdd.getNumPartitions()));

nombre de partitions : 2


# Première itération de k-means

In [None]:
global NB_CLUSTERS
NB_CLUSTERS = 3
rdd_data = spark_df.rdd
initial_centroids = rdd_data.takeSample(False, NB_CLUSTERS)
#print("init centroid execution:", len(initial_centroids), "in", (time.time() - start_time
# Converting centroids dataframe into dictionary indexed with centroid's number : 0 = firs
i = 0
for ctr in initial_centroids:
    initial_centroids[i] = [ctr['sepal_length'],ctr['sepal_width'],ctr['petal_length'],ctr['petal_width']]
    i += 1
# Print centroids values
print(initial_centroids)

[[6.6, 3.0, 4.4, 1.4], [5.4, 3.4, 1.7, 0.2], [5.0, 3.6, 1.4, 0.2]]


##Calculer la distance euclidienne et trouver le centroid le plus proche pour k-means

In [None]:
class Mapper:
  def dist_centroids(self, dict_centroids, point):
    # Initializing the min. distance with a large value so that all calculated values woul
    min_dist = float("inf")
    # Initializing nearest centroid to 0
    nearest_centroid = 0
    for i in range(len(dict_centroids)):
    # Retreive centroid number i
     centroid = dict_centroids[i]
    # Calculate euclidean distance between points and centroids
    distance = sqrt( (point['sepal_length']-centroid[0])**2 + (point['sepal_width']-centroid[1])**2 + (point['petal_length']-centroid[2])**2 +  (point['petal_width']-centroid[3])**2 )
    if(distance < min_dist):
      min_dist = distance
      nearest_centroid = i
    return (nearest_centroid, point)

# Appliquation du k-means avec une architecture parallèle

In [None]:
m = Mapper()
p = rdd_data.take(3)
pt = p[0]
result = m.dist_centroids(initial_centroids, pt)
print(result)
#afficher le nombre de cores utilisé 
print(mp.cpu_count())
#afficher le nombre de threads 
print(th.active_count())

(2, Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa'))
2
13


In [None]:
print(initial_centroids)
print(pt)
# Afficher le nombre de partitions de l'ensemble de données data
print("Nombre de partitions de l'ensemble de données : ", rdd_data.getNumPartitions())
# Fin du chronomètre
end_time = time.time()

[[6.6, 3.0, 4.4, 1.4], [5.4, 3.4, 1.7, 0.2], [5.0, 3.6, 1.4, 0.2]]
Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa')
Nombre de partitions de l'ensemble de données :  2


# Affichage du temps d'éxecution

In [None]:
# Afficher le temps d'exécution
print("Temps d'exécution : ", end_time - start_time, "secondes")
new_rdd = rdd_data.map(lambda x: [m.dist_centroids(initial_centroids[i], x) for i in range])
# Arrêter la SparkSession
spark.stop()


Temps d'exécution :  24.681127071380615 secondes


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
