# SPARK K-MEANS CLUSTERING

L'objectif principal de ce projet est de proposer un k-means clustering de Bristol City Bike en fonction de l'emplacement des stations vélos en utilisant spark. Le fichier BRISBANE-city-bike.json  contient des informations concernant l’emplacement de chaque vélo.  
  


In [17]:
# Importation des librairies
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import folium

In [18]:
# Instanciation du Spark Session
spark = SparkSession.builder.appName("Projet_Kmeans").getOrCreate()

In [19]:
import configparser
config = configparser.ConfigParser()
config.read("properties.conf")
path_to_input_data= config['Bristol-City-bike']['Input_data']
path_to_output_data= config['Bristol-City-bike']['Output_data']
num_partition_kmeans = config.getint('Bristol-City-bike','Kmeans_level')


In [20]:
# Importation de la base de données
df = spark.read.json(path = path_to_input_data)
df.show(5)

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
+--------------------+----------+----------+--------------------+------+
only showing top 5 rows



In [21]:
df.select(col("latitude")).show(2)

+----------+
|  latitude|
+----------+
|-27.482279|
| -27.47059|
+----------+
only showing top 2 rows



In [22]:
kmeans_df = df.select(col("latitude"), col("longitude"))
kmeans_df.show(5)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
+----------+----------+
only showing top 5 rows



In [23]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
features = ("longitude","latitude")
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)
assembler = VectorAssembler(inputCols = features, outputCol = "features")
dataset = assembler.transform(kmeans_df)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)

In [24]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

In [25]:
fitted.groupBy("prediction").mean("latitude", "longitude").show()

+----------+-------------------+------------------+
|prediction|      avg(latitude)|    avg(longitude)|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



In [26]:
fitted.createOrReplaceTempView("fitted")
spark.sql("""SELECT prediction, mean(longitude) as m_long, mean(latitude) as m_lat
             FROM fitted
             GROUP BY prediction""").show()

+----------+------------------+-------------------+
|prediction|            m_long|              m_lat|
+----------+------------------+-------------------+
|         1|153.04186302272726|-27.460240636363633|
|         2|   153.02594553125| -27.47255990624999|
|         0|153.00572882926832|-27.481218536585374|
+----------+------------------+-------------------+



In [55]:
# Visualisation graphique
world_map = folium.Map(location = [-27.48,153.01], zoom_start = 13)
lat = list(fitted.select(col('latitude')).toPandas()["latitude"])
long = list(fitted.select(col('longitude')).toPandas()["longitude"])
name = list(df.select(col('name')).toPandas()["name"])
pred = list(fitted.select(col('prediction')).toPandas()["prediction"])
colors = ["orangered", "blue", "yellow"]

x = folium.map.FeatureGroup()

for i in range (len(lat)):
    x.add_child(folium.features.CircleMarker([lat[i], long[i]], radius = 5, color = colors[pred[i]],
                                            fill_color = colors[pred[i]]))
    world_map.add_child(x)

In [56]:
world_map

In [57]:
# Exportation des données
fitted.drop('features').toPandas().to_csv(path_to_output_data+'fitted.csv')

In [58]:
spark.stop()