# Spark k-means clustering

In [1]:
# Importation des bibliotheques
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os, shutil

**Q1) Instanciation du client Spark Session**

In [2]:
spark = SparkSession.builder\
                    .master("local")\
                    .appName("kmeans_clustering")\
                    .getOrCreate()

**Q2) Utilisation du fichier de configuration pour recuperer les path**

In [3]:
import configparser
config = configparser.ConfigParser()
config.read('properties.conf')
path_to_input_data = config['Bristol_city_bike']['input_data']
path_to_output_data = config['Bristol_city_bike']['output_data']
num_partition_kmeans = config['Bristol_city_bike']['kmeans_level']

**Q3) Importation du fichier Bristol-city-bike.json**

In [14]:
bristol = spark.read.json(path_to_input_data) 
               
bristol.show(5)
bristol.printSchema()

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
+--------------------+----------+----------+--------------------+------+
only showing top 5 rows

root
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- number: long (nullable = true)



**Q4) Creation d'un new dataframe contenant uniquement les variables latitude et longitude**

In [5]:
kmeans_df = bristol["latitude","longitude"]
kmeans_df.show(3)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
+----------+----------+
only showing top 3 rows



**Q5) k-means**

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

features = ('longitude','latitude')
kmeans = KMeans().setK(int(num_partition_kmeans)).setSeed(1)
assembler = VectorAssembler(inputCols = features, outputCol = "features")
dataset = assembler.transform(kmeans_df)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)

In [7]:
fitted.show()

+----------+----------+--------------------+----------+
|  latitude| longitude|            features|prediction|
+----------+----------+--------------------+----------+
|-27.482279|153.028723|[153.028723,-27.4...|         2|
| -27.47059|153.036046|[153.036046,-27.4...|         2|
|-27.474531|153.042728|[153.042728,-27.4...|         1|
|-27.461881|153.046986|[153.046986,-27.4...|         1|
|-27.469658|153.016696|[153.016696,-27.4...|         2|
| -27.48172| 153.00436|[153.00436,-27.48...|         0|
|-27.493626|153.001482|[153.001482,-27.4...|         0|
|-27.476076|153.002459|[153.002459,-27.4...|         0|
|-27.493963|153.011938|[153.011938,-27.4...|         0|
|-27.482197|153.020894|[153.020894,-27.4...|         2|
|-27.465226|153.050864|[153.050864,-27.4...|         1|
|-27.468447|153.024662|[153.024662,-27.4...|         2|
|-27.473021|153.025988|[153.025988,-27.4...|         2|
|-27.457825|153.036866|[153.036866,-27.4...|         1|
| -27.48148| 153.02368|[153.02368,-27.48...|    

**Q6) Colonnes de fitted**

In [8]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

**Q7) latitude et longitude moyenne de chaque groupe**

In [9]:
#DSL
fitted.groupBy("prediction") \
        .agg(avg("latitude").alias("Moyenne_latitude"), avg("longitude").alias("Moyenne_longitude")) \
        .show()

+----------+-------------------+------------------+
|prediction|   Moyenne_latitude| Moyenne_longitude|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



In [10]:
#SQL
fitted.createOrReplaceTempView("fitted_sql") 
spark.sql(""" SELECT prediction, 
                     AVG(latitude) as Moyenne_latitude, 
                     AVG(longitude) as Moyenne_longitude 
              FROM fitted_sql 
              GROUP BY prediction """) \
    .show()

+----------+-------------------+------------------+
|prediction|   Moyenne_latitude| Moyenne_longitude|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



**Q8) Visualisation dans une map**

In [11]:
from ipyleaflet import Map, Marker, AwesomeIcon

center = (-27.460240636363633, 153.04186302272726)
m = Map(center = center)

icons = [
    AwesomeIcon(
        name='bicycle',
        marker_color='lightblue',
        icon_color='black',
        spin=False
    ), 
    AwesomeIcon(
        name='bicycle',
        marker_color='lightred',
        icon_color='black',
        spin=False
    ), 
    AwesomeIcon(
        name='bicycle',
        marker_color='beige',
        icon_color='black',
        spin=False
    )
]

for row in fitted.collect():
    point = Marker(location = (row["latitude"], row["longitude"]), draggable = False, icon = icons[row["prediction"]])
    m.add_layer(point)

m

Map(center=[-27.460240636363633, 153.04186302272726], controls=(ZoomControl(options=['position', 'zoom_in_text…

**Q9) Exportation de fitted**

In [13]:
if os.path.exists(path_to_output_data):
    shutil.rmtree(path_to_output_data)
    
fitted = fitted.drop("features")
fitted.write.csv(path_to_output_data)

In [27]:
spark.stop()