# <font color='blue'>**PROJET KMEANS** </font>

# Importation des packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 1) Instanciation de SparkSession

In [2]:
Spark = SparkSession.builder.master("local").appName("Kmeans").getOrCreate()

 # 2) Configuration

In [3]:
import configparser
config = configparser.ConfigParser()
config.read('properties.conf')
path_to_input_data= config['Brisbane_City_Bike']['Input_data']
path_to_output_data= config['Brisbane_City_Bike']['Output_data']
num_partition_kmeans = int(config['Brisbane_City_Bike']['Kmeans_level'])

# 3) Importation

In [4]:
brisbane = Spark.read.option("header",'true').option("delimiter",",").json(path_to_input_data)
brisbane.show()

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
|Montague Rd / Ski...| -27.48172| 153.00436|109 - MONTAGUE RD...|   109|
|Macquarie St / Gu...|-27.493626|153.001482|149 - MACQUARIE S...|   149|
|Bi-centennial Bik...|-27.476076|153.002459|139 - BI-CENTENNI...|   139|
|Sir William McGre...|-27.493963|153.011938|24 - SIR WILLIAM ...|    24|
|Vulture St / Trib...|-27.482197|153.020894|117 - VULTURE ST ...|   117|
|Lamington St / Re...|-27.465226|153.050864|73 - LA

# 4) Création Kmeans_df

In [5]:
Kmeans_df = brisbane.select('latitude','longitude')
Kmeans_df.show()

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
| -27.48172| 153.00436|
|-27.493626|153.001482|
|-27.476076|153.002459|
|-27.493963|153.011938|
|-27.482197|153.020894|
|-27.465226|153.050864|
|-27.468447|153.024662|
|-27.473021|153.025988|
|-27.457825|153.036866|
| -27.48148| 153.02368|
|-27.467464|153.022094|
|-27.499963|153.017633|
|-27.490776|152.994747|
|-27.458199|153.041688|
|-27.481808|153.025477|
+----------+----------+
only showing top 20 rows



# 5) K-means

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [7]:
features = ('longitude','latitude')

In [8]:
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)#remettre num-partition-kmeans

In [9]:
assembler = VectorAssembler(inputCols=features,outputCol="features")

In [10]:
dataset=assembler.transform(Kmeans_df)
dataset.show()

+----------+----------+--------------------+
|  latitude| longitude|            features|
+----------+----------+--------------------+
|-27.482279|153.028723|[153.028723,-27.4...|
| -27.47059|153.036046|[153.036046,-27.4...|
|-27.474531|153.042728|[153.042728,-27.4...|
|-27.461881|153.046986|[153.046986,-27.4...|
|-27.469658|153.016696|[153.016696,-27.4...|
| -27.48172| 153.00436|[153.00436,-27.48...|
|-27.493626|153.001482|[153.001482,-27.4...|
|-27.476076|153.002459|[153.002459,-27.4...|
|-27.493963|153.011938|[153.011938,-27.4...|
|-27.482197|153.020894|[153.020894,-27.4...|
|-27.465226|153.050864|[153.050864,-27.4...|
|-27.468447|153.024662|[153.024662,-27.4...|
|-27.473021|153.025988|[153.025988,-27.4...|
|-27.457825|153.036866|[153.036866,-27.4...|
| -27.48148| 153.02368|[153.02368,-27.48...|
|-27.467464|153.022094|[153.022094,-27.4...|
|-27.499963|153.017633|[153.017633,-27.4...|
|-27.490776|152.994747|[152.994747,-27.4...|
|-27.458199|153.041688|[153.041688,-27.4...|
|-27.48180

In [11]:
model = kmeans.fit(dataset)

In [12]:
fitted = model.transform(dataset)

In [13]:
fitted.show()

+----------+----------+--------------------+----------+
|  latitude| longitude|            features|prediction|
+----------+----------+--------------------+----------+
|-27.482279|153.028723|[153.028723,-27.4...|         2|
| -27.47059|153.036046|[153.036046,-27.4...|         2|
|-27.474531|153.042728|[153.042728,-27.4...|         1|
|-27.461881|153.046986|[153.046986,-27.4...|         1|
|-27.469658|153.016696|[153.016696,-27.4...|         2|
| -27.48172| 153.00436|[153.00436,-27.48...|         0|
|-27.493626|153.001482|[153.001482,-27.4...|         0|
|-27.476076|153.002459|[153.002459,-27.4...|         0|
|-27.493963|153.011938|[153.011938,-27.4...|         0|
|-27.482197|153.020894|[153.020894,-27.4...|         2|
|-27.465226|153.050864|[153.050864,-27.4...|         1|
|-27.468447|153.024662|[153.024662,-27.4...|         2|
|-27.473021|153.025988|[153.025988,-27.4...|         2|
|-27.457825|153.036866|[153.036866,-27.4...|         1|
| -27.48148| 153.02368|[153.02368,-27.48...|    

# 6) Les colonnes de fitted

Les noms des colonnes de fitted sont latitude, longitude, features et prediction (vérification grâce à **fitted.show()**)

# 7) Calcul des moyennes par cluster

In [14]:
#SQL
fitted.createOrReplaceTempView("fittedSQL") #Transformation du data frame en table !!!!
Spark.sql("Select prediction , mean(latitude) as Moyenne_latitude, mean(longitude) as Moyenne_longitude from fittedSQL group by prediction order by prediction ").show()

+----------+-------------------+------------------+
|prediction|   Moyenne_latitude| Moyenne_longitude|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



In [15]:
#DSL
fitted_Kmeans = fitted.orderBy(F.col("prediction")) \
      .groupBy(F.col("prediction"))\
      .agg(F.mean("latitude").alias("Moyenne_lat"),F.mean("longitude").alias("Moyenne_lon"))\
      .show()

+----------+-------------------+------------------+
|prediction|        Moyenne_lat|       Moyenne_lon|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



On trouve les mêmes résultats avec DSL et SQL.

# 8) Affichage des coordonnées sur une carte leaflet

In [19]:
import folium
from folium.plugins import MarkerCluster
import pandas as pd

In [21]:
# Coordonnées acquises à la question 7
Cluster_0 = [-27.481218536585374, 153.00572882926832]
Cluster_1 = [-27.460240636363633,153.04186302272726]
Cluster_2 = [-27.47255990624999,  153.02594553125]

In [29]:
#On ajoute les marqueurs sur la carte
my_map = folium.Map(location = Cluster_2, zoom_start = 13,control_scale=True, prefer_canvas=True) 
folium.Marker(Cluster_0, popup = 'Cluster 0').add_to(my_map)
folium.Marker(Cluster_1, popup = 'Cluster 1').add_to(my_map)
folium.Marker(Cluster_2, popup = 'Cluster 2').add_to(my_map)

#On affiche la carte
my_map

# 9) Exportation 

In [None]:
fitted_clean = fitted.select('prediction','latitude','longitude')

In [None]:
fitted_clean.coalesce(1).write.format('json').save(path_to_output_data) # Exportation à ne lancer qu'une fois
# A chaque execution de cette ligne, le dossier nommé exported doit être supprimé.