In [18]:
#Instancier SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import leaflet
import folium

In [12]:
import configparser
config = configparser.ConfigParser()
config.read('properties.conf')
path_to_input_data= config['Brisbane-City-bike']['Input_data']
path_to_output_data= config['Brisbane-City-bike']['Output_data']
num_partition_kmeans = config.getint('Brisbane-City-bike','Kmeans_level')


In [10]:
#Importation du fichier json
bristol=spark.read\
    .option("header",True)\
    .json("Bristol-city-bike.json")

In [25]:
#Importation du fichier json
brisbane=spark.read.json(path=path_to_input_data)

In [5]:
brisbane.show(3)

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
+--------------------+----------+----------+--------------------+------+
only showing top 3 rows



In [6]:
#Schema des données
brisbane.printSchema()

root
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- number: long (nullable = true)



In [9]:
#Dimensions du Dataframe
print(brisbane.count(),len(brisbane.columns))

149 5


In [10]:
#Création de la base Kmeans_df
kmeans_df=brisbane.select(col("longitude"),col("latitude"))
kmeans_df.show(5)

+----------+----------+
| longitude|  latitude|
+----------+----------+
|153.028723|-27.482279|
|153.036046| -27.47059|
|153.042728|-27.474531|
|153.046986|-27.461881|
|153.016696|-27.469658|
+----------+----------+
only showing top 5 rows



In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
features = ("longitude","latitude")
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)
assembler = VectorAssembler(inputCols=features,outputCol="features")
dataset=assembler.transform(kmeans_df)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)

In [14]:
#On vérifie les colonnes
fitted.columns

['longitude', 'latitude', 'features', 'prediction']

In [16]:
#Calcule de la longitude et de la latitude moyenne par cluster (DSL)
fitted.groupby(col("prediction"))\
    .agg(mean(col("longitude")).alias("Moyenne_longitude"),mean(col("latitude")).alias("Moyenne_latitude"))\
    .orderBy(col("prediction"))\
    .show()


+----------+------------------+-------------------+
|prediction| Moyenne_longitude|   Moyenne_latitude|
+----------+------------------+-------------------+
|         0|153.00572882926832|-27.481218536585374|
|         1|153.04186302272726|-27.460240636363633|
|         2|   153.02594553125| -27.47255990624999|
+----------+------------------+-------------------+



In [17]:
#Calcule de la longitude et de la latitude moyenne par cluster (DSL)
fitted.createOrReplaceTempView("fitted_sql")
spark.sql(""" select prediction,avg(longitude) as Moyenne_logitude, avg(latitude) as Moyenne_latitude
                    from fitted_sql 
                     group by prediction order by prediction""").show()

+----------+------------------+-------------------+
|prediction|  Moyenne_logitude|   Moyenne_latitude|
+----------+------------------+-------------------+
|         0|153.00572882926832|-27.481218536585374|
|         1|153.04186302272726|-27.460240636363633|
|         2|   153.02594553125| -27.47255990624999|
+----------+------------------+-------------------+



In [39]:
#Set des coordonnées géographiques de Brisbane
latitude_brisbane=-27.4710107
longitude_brisbane=153.0234489
brisbane_map = folium.Map(location = [latitude_brisbane, longitude_brisbane], zoom_start = 12.6)

In [24]:
brisbane_map

In [26]:
#on recupère la logitude, la lattitude, les clusters et les noms dans des listes distinctes
lat=list(fitted.select(col('latitude')).toPandas()['latitude'])
long=list(fitted.select(col('longitude')).toPandas()['longitude'])
pred=list(fitted.select(col('prediction')).toPandas()['prediction'])
name=list(brisbane.select(col('name')).toPandas()['name'])

In [40]:
#Ajout des markers sur chaque points
liste=[]
for i in range(0,len(lat)):
    if pred[i]==0:
        col="blue"
    elif pred[i]==1:
                col="red"
    else:
        col="darkgreen"
    folium.Marker([lat[i],long[i]],popup=name[i],icon=folium.Icon(color=col)).add_to(brisbane_map)
brisbane_map

    

In [35]:
 brisbane_map.save('Map.html')


In [36]:
#Suppression de la colonne "features"
fitted_export=fitted.drop("features")

In [37]:
#On exporte la base au format csv
fitted_export.toPandas().to_csv(path_to_output_data+"clusters_brisbane.csv")