**SPARK K-MEANS CLUSTERING**


In [39]:
##############################################################

# Contributeur : OUPRAXAY Philippe, TOURE Almamy Moustapha
# 22/01/2021
# Master2 SEP- Eco

##############################################################

In [132]:
import findspark
findspark.init("C:\spark")
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import folium
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import configparser 

**1)INSTANCIATION CLIENT SPARK SESSION**

In [133]:

spark = SparkSession.builder.appName('SPARK K-MEANS').getOrCreate()

**2) Création d'un fichier properties.conf**



In [134]:

config = configparser.ConfigParser()
config.read("properties.conf")



['properties.conf']

In [137]:
# configuration des variables 
path_to_input_data= config['Bristol-City-bike']['Input_data']
path_to_output_data= config['Bristol-City-bike']['Output_data']
num_partition_kmeans = int(config['Bristol-City-bike']['Kmeans_level'])

**3) importer le json avec spark** 

In [138]:
bristol = spark.read.json(path_to_input_data)
bristol.show(5)

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
+--------------------+----------+----------+--------------------+------+
only showing top 5 rows



**4) Création d'un nouveau data frame Kmeans-df contenant variables latitute et longitude**

In [139]:
Kmeansdf = bristol.select('latitude', 'longitude')
Kmeansdf.show(5)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
+----------+----------+
only showing top 5 rows



In [140]:
# une autre façon de faire
Kmeansdf = bristol[['latitude','longitude']]
Kmeansdf.show(5)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
+----------+----------+
only showing top 5 rows



**5) Kmeans**

In [141]:
features = ( 'longitude' , 'latitude')
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)
assembler = VectorAssembler(inputCols=features,outputCol="features")
dataset=assembler.transform(Kmeansdf)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)
fitted.show()

+----------+----------+--------------------+----------+
|  latitude| longitude|            features|prediction|
+----------+----------+--------------------+----------+
|-27.482279|153.028723|[153.028723,-27.4...|         2|
| -27.47059|153.036046|[153.036046,-27.4...|         2|
|-27.474531|153.042728|[153.042728,-27.4...|         1|
|-27.461881|153.046986|[153.046986,-27.4...|         1|
|-27.469658|153.016696|[153.016696,-27.4...|         2|
| -27.48172| 153.00436|[153.00436,-27.48...|         0|
|-27.493626|153.001482|[153.001482,-27.4...|         0|
|-27.476076|153.002459|[153.002459,-27.4...|         0|
|-27.493963|153.011938|[153.011938,-27.4...|         0|
|-27.482197|153.020894|[153.020894,-27.4...|         2|
|-27.465226|153.050864|[153.050864,-27.4...|         1|
|-27.468447|153.024662|[153.024662,-27.4...|         2|
|-27.473021|153.025988|[153.025988,-27.4...|         2|
|-27.457825|153.036866|[153.036866,-27.4...|         1|
| -27.48148| 153.02368|[153.02368,-27.48...|    

**6) Noms des colonnes de fitted**

In [142]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

les colonnes latitue, longitude , features et prediction sont 

**7) Déterminer les longitudes et latitudes moyenne pour chaque groupe en utilisant spark DSL et SQL et comparer les résultats**



**SPARK DSL

In [143]:
fitted.groupby('prediction')\
        .agg(F.mean('latitude').alias ('moy_latitute'),F.mean('longitude').alias ('moy_longitude'))\
        .orderBy('prediction').show()

+----------+-------------------+------------------+
|prediction|       moy_latitute|     moy_longitude|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



** SQL

In [144]:
fitted.createOrReplaceTempView("fitted")
spark.sql(""" 
select prediction, avg(latitude) as Moy_latitude, avg(longitude) as moy_longitude
from fitted
group by prediction
order By prediction """).show()

+----------+-------------------+------------------+
|prediction|       Moy_latitude|     moy_longitude|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



les resultats des deux procedures (DSL et SQL) sont equivalents

**8) Bonus visualisation dans une map, corriger la ville qui s'appelle Brisbane**

In [145]:
# determination d'une latitude et longitude moyenne
bristol.createOrReplaceTempView("bristol2")

cordonne = spark.sql("""
select avg(latitude) as lat_moyenne, avg(longitude) as long_moyenne
from bristol2

""").show()

+------------------+------------------+
|       lat_moyenne|      long_moyenne|
+------------------+------------------+
|-27.47130457718122|153.02508301342277|
+------------------+------------------+



In [154]:
# definition des coordonnées de la ville de brisbane
coor_moy = [-27.471304, 153.025083]
# Création d'une carte
fmap = folium.Map(location=coor_moy, zoom_start=12)

# Ajout d'un marqueur en fonction des groupes
folium.Marker([-27.48121853, 153.005728],
              popup="prediction 0",
              icon=folium.Icon(color='green')).add_to(fmap)

folium.Marker([-27.460240, 153.041863],
              popup="prediction 1",
              icon=folium.Icon(color='blue')).add_to(fmap)
folium.Marker([-27.472559, 153.025945],
              popup="prediction 2",
              icon=folium.Icon(color='red')).add_to(fmap)



#folium.PolyLine(points, color="blue", weight=2.5, opacity=0.8).add_to(fmap)

# Génération du fichier HTML contenant la carte
fmap.save("out_put/brisbane.html")
             

**9) Exporter la data frame fitted après élimination de la colonne features dans le réportoire path-to-output-data**


In [153]:
fitted2= fitted.drop('features')
fitted2.write.format("csv").mode("overwrite").save(path_to_output_data , header = True, index = False)

# le fichier est creér dans le repertoire specifié

In [155]:
# fin de la session

In [None]:
spark.stop()