## TP K-Means Clustering

**Question 1 : Instancier le client Spark Session**

In [15]:
# Importation
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os, shutil
from folium.plugins import MarkerCluster
import folium
import configparser

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Instantiation
spark = SparkSession.builder \
                    .master("local") \
                    .appName("BikeDF") \
                    .getOrCreate()

**Question 2 : Créer un fichier properties.conf contenant les informations relatives à vos paramètres du programme en dur**

In [16]:
config = configparser.ConfigParser()
config.read("properties.conf")
path_to_input_data = config['Bristol-City-bike']['Input-data']
path_to_output_data = config['Bristol-City-bike']['Output-data']
num_partition_kmeans = config['Bristol-City-bike']['Kmeans-level'] 

num_partition_kmeans = int(num_partition_kmeans) # convertir en entier

**Question 3 : Importer la base**

In [17]:
bristol = spark.read.json(path_to_input_data)
bristol.show(5)


+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
+--------------------+----------+----------+--------------------+------+
only showing top 5 rows



**Question 4 : Créer un nouveau data frame 'Kmeans_df' contenant seulement les variables latitude et longitude** 


In [18]:
Kmeans_df = bristol['latitude', 'longitude'] 
Kmeans_df.show(5)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
+----------+----------+
only showing top 5 rows



**Question 5 : KMeans**

In [33]:
features = ("longitude", "latitude")
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)
assembler = VectorAssembler(inputCols = features, outputCol = "features")
dataset = assembler.transform(Kmeans_df)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)


**Question 6 :  Quels sont les noms des colonnes de fitted ?**

In [7]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

<font color="blue"> Les colonnes sont bien les variables latitude, longitude, features et predictions.

**Question 7 :  Déterminer les longitudes et latitudes moyennes pour chaque groupe en utilisant spark DSL et SQL. comparer les résultat**

In [20]:
fitted.createOrReplaceTempView("fittedSQL") # transformation du data frame en table

In [21]:
#SQL
spark.sql("""select mean(longitude) as Moyenne_longitude, mean(latitude) as Moyenne_latitude from fittedSQL""").show() 


#DSL
fitted.agg(F.mean("longitude").alias("Moyenne_longitude"),F.mean("latitude").alias("Moyenne_latitude")) \
          .show()

+------------------+------------------+
| Moyenne_longitude|  Moyenne_latitude|
+------------------+------------------+
|153.02508301342277|-27.47130457718122|
+------------------+------------------+

+------------------+------------------+
| Moyenne_longitude|  Moyenne_latitude|
+------------------+------------------+
|153.02508301342277|-27.47130457718122|
+------------------+------------------+



<font color="blue"> Nous obtenons les mêmes résulats.

**Question 8 : Faire une visualisation dans une map avec le package leaflet**

In [34]:
#Coordonnées de la ville de Brisbane
Bristol_coords = [-27.4710107, 153.0234489]

#Création de la map sur la ville de Brisbane
map = folium.Map(location = Bristol_coords, zoom_start = 13)

#Coord des points que l'on veut
for i in range(fitted.count()):
    if fitted.collect()[i][3] == 0:
        coord = [fitted.collect()[i][0], fitted.collect()[i][1]]
        folium.Marker(coord, icon=folium.Icon(color="purple")).add_to(map)
    elif fitted.collect()[i][3] == 1:
        coord = [fitted.collect()[i][0], fitted.collect()[i][1]]
        folium.Marker(coord, icon=folium.Icon(color="blue")).add_to(map)
    else:
        coord = [fitted.collect()[i][0], fitted.collect()[i][1]]
        folium.Marker(coord, icon=folium.Icon(color="orange")).add_to(map)

#Affiche la map 
map

In [36]:
#Exportation de la carte sous format html
map.save("output/Bristol-city-bike.html")

**Question 9 : Exporter la data frame fitted après élimination de la colonne  features, dans le répertoire path_to_output_data**
    


In [28]:
fitted = fitted.drop('features') 
fitted.show(5)

if os.path.exists(path_to_output_data):
        shutil.rmtree(path_to_output_data)

fitted.write.csv(path_to_output_data, sep = ';', header = 'true')

+----------+----------+----------+
|  latitude| longitude|prediction|
+----------+----------+----------+
|-27.482279|153.028723|         2|
| -27.47059|153.036046|         2|
|-27.474531|153.042728|         1|
|-27.461881|153.046986|         1|
|-27.469658|153.016696|         2|
+----------+----------+----------+
only showing top 5 rows



**Fin de la session Spark**

In [13]:
spark.stop()