### Instanciation du client Spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('KMeans-project').getOrCreate()

### Lecture du fichier properties.conf

In [2]:
import configparser
config = configparser.ConfigParser()
config.read("../propriete/properties.conf")
pathin = config['Bristol-City-bike']['Input-data']
pathout = config['Bristol-City-bike']['Output-data']
partkmeans = config['Bristol-City-bike']['Kmeanslevel']
partkmeans = int(partkmeans)

### Import du fichier Bristol-city-bike.json

In [3]:
bristol = spark.read.json(pathin)

### Création du DataFrame kmeansdf contenant seulement latitude et longitude

In [4]:
kmeansdf = bristol.select("latitude", "longitude")

### Kmeans

In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
features = ('latitude', 'longitude')
kmeans = KMeans().setK(partkmeans).setSeed(1)
assembler = VectorAssembler(inputCols=features,outputCol="features")
dataset = assembler.transform(kmeansdf)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)

### Colonnes de fitted

In [6]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

### Latitude moyenne et Longitude moyenne pour chaque groupe

In [7]:
fitted.createOrReplaceTempView("fittedSQL")
from pyspark.sql import functions as F

In [8]:
# En SQL
spark.sql("""select prediction, Mean(latitude) as MoyLatitude, Mean(longitude) as MoyLongitude 
            from fittedSQL group by prediction order by prediction""").show()

+----------+-------------------+------------------+
|prediction|        MoyLatitude|      MoyLongitude|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



In [9]:
# En DSL
fitted.groupby('prediction').agg(F.mean('latitude').alias('MoyLatitude'), F.mean('longitude').alias('MoyLongitude')).orderBy('prediction').show()

+----------+-------------------+------------------+
|prediction|        MoyLatitude|      MoyLongitude|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



### Bonus : Visualisation dans une map

In [10]:
import folium
bristol_coords = [-27.4710107, 153.0234489]
#Create the map
my_map = folium.Map(location = bristol_coords, zoom_start = 14)
#Add markers to the map
for i in range(fitted.count()):
    if(fitted.collect()[i][3]==0):
        clust0 = [fitted.collect()[i][0], fitted.collect()[i][1]]
        folium.Marker(clust0, popup = 'Cluster 0', icon=folium.Icon(color='blue')).add_to(my_map)
    elif(fitted.collect()[i][3]==1):
        clust1 = [fitted.collect()[i][0], fitted.collect()[i][1]]
        folium.Marker(clust1, popup = 'Cluster 1', icon=folium.Icon(color='red')).add_to(my_map)
    else :
        clust2 = [fitted.collect()[i][0], fitted.collect()[i][1]]
        folium.Marker(clust2, popup = 'Cluster 2', icon=folium.Icon(color='green')).add_to(my_map)
#Display the map
my_map

### Export du DataFrame fitted après élimination de la colonne features

In [11]:
fitted.drop("features").write.csv(pathout)

In [12]:
spark.stop()