# Brisbane City Bike Kmeans Clustering

### Importation des packages nécessaires :

In [19]:
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import folium

In [6]:
pip install configparser

Collecting configparser
  Downloading configparser-5.0.1-py3-none-any.whl (22 kB)
Installing collected packages: configparser
Successfully installed configparser-5.0.1
Note: you may need to restart the kernel to use updated packages.


### 1 - Instancier le client Spark Session :  

In [3]:
# creating the spark session
spark = SparkSession.builder.appName('kmeans').getOrCreate()

### 2 - Création du fichier properties.conf contenant les informations relatives à vos paramètres du programme

In [4]:
import configparser
config = configparser.ConfigParser()
config.read('conf/properties.conf')
path_to_input_data= config['Bristol-City-bike']['Input_data']
path_to_output_data= config['Bristol-City-bike']['Output_data']
num_partition_kmeans = config.getint('Bristol-City-bike','Kmeans_level')

### 3 - Importation de la base de données : 

In [5]:
brisbane = spark.read.json(path=path_to_input_data)
brisbane.show()

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
|Montague Rd / Ski...| -27.48172| 153.00436|109 - MONTAGUE RD...|   109|
|Macquarie St / Gu...|-27.493626|153.001482|149 - MACQUARIE S...|   149|
|Bi-centennial Bik...|-27.476076|153.002459|139 - BI-CENTENNI...|   139|
|Sir William McGre...|-27.493963|153.011938|24 - SIR WILLIAM ...|    24|
|Vulture St / Trib...|-27.482197|153.020894|117 - VULTURE ST ...|   117|
|Lamington St / Re...|-27.465226|153.050864|73 - LA

### 4 - création d'un nouveau data frame Kmeans_df contenant seulement les variables latitude et longitude. 

In [6]:
kmeans_df = brisbane.select(col('latitude'), col('longitude'))
kmeans_df.show(5)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
+----------+----------+
only showing top 5 rows



### 5 - k means classifier :

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
features = ('longitude','latitude')
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)
assembler = VectorAssembler(inputCols=features,outputCol="features")
dataset=assembler.transform(kmeans_df)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)

### 6 - les noms des colonnes de fitted :

In [118]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

### 7 - Détermination des longitudes et latitudes moyennes pour chaque groupe en utilisant spark DSL et SQL

In [20]:
fitted.groupBy('prediction')\
      .agg(mean('latitude').alias('latitude_moyenne'), mean('longitude').alias('longitude_moyenne'))\
      .orderBy('prediction').show()

+----------+-------------------+------------------+
|prediction|   latitude_moyenne| longitude_moyenne|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



In [21]:
#Version SQL :
fitted.createOrReplaceTempView("fitted")
spark.sql("""
    SELECT prediction, avg(latitude) as Latitude_moyenne, avg(longitude) as Longitude_moyenne 
    FROM fitted 
    GROUP BY prediction
    ORDER BY prediction""").show()

+----------+-------------------+------------------+
|prediction|   Latitude_moyenne| Longitude_moyenne|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



### 8 - visualisation dans une map avec le package folium:

In [219]:
brisbane_map = folium.Map(location=[-27.47, 153.02], zoom_start=12.5)

lat=list(fitted.select(col('latitude')).toPandas()['latitude'])
long=list(fitted.select(col('longitude')).toPandas()['longitude'])
pred=list(fitted.select(col('prediction')).toPandas()['prediction'])
name=list(bristol.select(col('name')).toPandas()['name'])

colors = ['red', 'blue', 'yellow']

for latitude, longitude, name, prediction in zip(lat, long, name, pred):
    label = folium.Popup(name, parse_html=True)
    folium.CircleMarker(
        [latitude, longitude],
        radius=5,
        popup=label,
        color='black',
        fill=True,
        fill_color=colors[prediction],
        fill_opacity=0.7).add_to(brisbane_map)  

brisbane_map

### 9 - Exporter la data frame fitted après élimination de la colonne  features :

In [16]:
fitted.drop('features').toPandas().to_csv(path_to_output_data+'fitted.csv')

### Arrêt de la session Spark :

In [None]:
# Stop Spark Session :
spark.stop()