# SPARK K-MEANS CLUSTERING

## Importation des bibliothèques et des modules

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import configparser

# pip install folium
import folium
import pandas as pd

## Instanciation du client Spark Session.

In [13]:
spark = SparkSession.builder\
                    .master("local[*]")\
                    .appName("K_means_Clustering")\
                    .getOrCreate()

## Création du fichier de configuration.

Le code ci-dessous permet de créer un fichier de configuration nommé *properties.conf*. Celui-ci est composé d'une section nommée *Brisbane_City_bike* et de trois options définies par un nom et avec une valeur. Il est également possible de créer manuellement le fichier de configuration via un traitement de texte ou Jupyter.

In [14]:
Fichier_Config = open(r'../configuration/properties.conf','w')
Fichier_Config.write("[Brisbane_City_bike] \n") #Nom de la section
Fichier_Config.write("Input_data= ../data/Brisbane-city-bike.json \n")
Fichier_Config.write("Output_data= ../exported/ \n")
Fichier_Config.write("Kmeans_level= 3 \n")
Fichier_Config.close()

## Utilisation du fichier de configuration pour récupérer options.

In [15]:
config = configparser.ConfigParser()

#Lecture du fichier properties.conf
config.read(r'../configuration/properties.conf')

# Récupération et assignation dans une variable de la valeur de chaque option de la section Brisbane_City_bike.
path_to_input_data = config['Brisbane_City_bike']['Input_data']
path_to_output_data = config['Brisbane_City_bike']['Output_data']
num_partition_kmeans = config['Brisbane_City_bike']["Kmeans_level"]

## Importation de la base de données *Brisbane_City_bike.json*.


In [16]:
# Importation de la base de données
Brisbane_Cb = spark.read\
                   .json(path_to_input_data)
# Aperçu des 10 premières lignes de la base de données.
Brisbane_Cb.show(10)

# Vérification du types des variables
Brisbane_Cb.printSchema()

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
|Montague Rd / Ski...| -27.48172| 153.00436|109 - MONTAGUE RD...|   109|
|Macquarie St / Gu...|-27.493626|153.001482|149 - MACQUARIE S...|   149|
|Bi-centennial Bik...|-27.476076|153.002459|139 - BI-CENTENNI...|   139|
|Sir William McGre...|-27.493963|153.011938|24 - SIR WILLIAM ...|    24|
|Vulture St / Trib...|-27.482197|153.020894|117 - VULTURE ST ...|   117|
+--------------------+----------+----------+-------

## Création d'un nouveau data frame *Kmeans-df* contenant seulement les variables latitude et longitude. 


In [17]:
# Selection des colonnes latitude et longitude
Kmeans_df = Brisbane_Cb.select(col("latitude"), col("longitude"))

#Aperçu du data frame
Kmeans_df.show(10)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
| -27.48172| 153.00436|
|-27.493626|153.001482|
|-27.476076|153.002459|
|-27.493963|153.011938|
|-27.482197|153.020894|
+----------+----------+
only showing top 10 rows



## K means

In [18]:
# Création d'un tuple nommé features contenant les éléments longitude et latitude
features = ("longitude","latitude") 

# Création d'une colonne vectorielle nommée features composée des valeurs de longitude et latitude
assembler = VectorAssembler(inputCols=features, outputCol="features")
dataset = assembler.transform(Kmeans_df)

# Entrainement de l'algorithme avec K = 3 et prédiction
kmeans = KMeans().setK(int(num_partition_kmeans))\
                 .setSeed(1)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)

## Quels sont les noms des colonnes de fitted ?

In [19]:
print("Le nom des colonnes de la base fitted sont:")
fitted.columns

Le nom des colonnes de la base fitted sont:


['latitude', 'longitude', 'features', 'prediction']

## Détermination de la longitude et latitude moyenne pour chaque groupe.

In [20]:
## DSL 
fitted.groupBy(col("prediction")).agg(avg(col("latitude")).alias("LatitudeMoyenne"), avg(col("longitude")).alias("LongitudeMoyenne")).show()

## SQL
fitted.createOrReplaceTempView("data_SQL")
spark.sql("""select prediction, mean(latitude) as LatitudeMoyenne, mean(longitude) as LongitudeMoyenne from data_SQL group by prediction """).show()

+----------+-------------------+------------------+
|prediction|    LatitudeMoyenne|  LongitudeMoyenne|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+

+----------+-------------------+------------------+
|prediction|    LatitudeMoyenne|  LongitudeMoyenne|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



On constate que la moyenne de la variable latitude pour les trois catégories est très proche. En effet, les moyennes de la latitude sont environ égales à -27,4 ; la différence ne s'observe qu'au centième. En regardant plus précisément, on constate que la latitude moyenne de la classe 1 est la plus haute et inversement pour la classe 0 qui possède la moyenne la plus faible. 
Le même constat peut-être fait sur les moyennes de la longitude : les moyennes sont très proches; la moyenne de la classe 1 est la moyenne la plus haute et la classe 0 possède la moyenne la plus faible.

## Visualisation dans une carte

In [10]:
# Creation de la carte
Map = folium.Map(location = [-27.47255990624999, 153.02594553125], zoom_start = 13)

# Transformation de la base de données fitted en data frame Pandas
Fitted_pd = fitted.toPandas()

# Création d'une fonction qui attribut une couleur différentes selon le cluster.
def color(row):
    if row['prediction'] == 1:
        color= "red"
    elif row['prediction'] == 2:
        color= "green"
    else:
        color = "blue"
    return color

# Boucle pour afficher sur la carte l'ensemble des emplacements de vélo
for index, row in Fitted_pd.iterrows():
    folium.Marker(location = [row.loc['latitude'], row.loc['longitude']], popup = "I'm here", icon = folium.Icon(color(row))).add_to(Map)

# Affichage de la carte
Map

## Exportation du data frame fitted dans le répertoire *path_to_output_data*

In [11]:
# Supression de la colonne features
fitted = fitted.drop(col("features"))
# Exportation du data frame fitted au format CSV
fitted.toPandas().to_csv(path_to_output_data + "Resultat_Kmeans.csv")