 Importation des librairies

In [1]:
import findspark
findspark.init("/home/adam/Documents/spark/sp")
import pyspark 
findspark.find()
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

### 1- Instanciation de client Spark Session

In [2]:
spark=SparkSession\
                  .builder.master("local[3]")\
                  .appName("Kmeans_Clustering")\
                  .getOrCreate()


### 2- Création d un fichier properties.conf contenant les informations relatives à vos paramètres du programme en dur.

In [3]:
import configparser
config = configparser.ConfigParser()
config.read('properties.conf')

['properties.conf']

Utilisation du fichier de configuration pour récupérer les path. 


In [4]:
path_to_input_data   = config['Bristol-City-bike']['Input-data']
path_to_output_data  = config['Bristol-City-bike']['Output-data']
num_partition_kmeans = config['Bristol-City-bike']['Kmeans-level'] 
num_partition_kmeans  = int(num_partition_kmeans )

### 3- Importer le json avec spark

In [5]:
bristol = spark.read.json(path_to_input_data)


Affichage des deux premier ligne

In [6]:
bristol.show(2)

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
+--------------------+----------+----------+--------------------+------+
only showing top 2 rows



### 4- création d un nouveau data frame Kmeans-df contenant seulement les variables latitude et longitude.

In [7]:
Kmeans_df=bristol.select("latitude","longitude")

 Affichage des premier ligne

In [9]:
Kmeans_df.show(2)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
+----------+----------+
only showing top 2 rows



### 5- k means.

In [11]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
features = ('longitude','latitude')
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)
assembler = VectorAssembler(inputCols=features,outputCol="features")
dataset=assembler.transform(Kmeans_df)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)

### 6- Noms des colonnes de fitted 

In [12]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

### 7- Détermination des longitudes et des latitudes moyennes pour chaque groupe en utilisant spark

Spark DSL

In [13]:
moy_DSL=fitted\
               .groupBy("prediction")\
               .mean("latitude","longitude")\
               .show()

+----------+-------------------+------------------+
|prediction|      avg(latitude)|    avg(longitude)|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



Spark SQL

In [14]:
fitted.createOrReplaceTempView("fittedSQL") # transformation du data frame en table !
spark.sql(""" select prediction, 
                 mean(latitude) as Moy_Latitude,
                 mean(longitude) as Moy_Longitude
           from fittedSQL
           group by prediction   """).show()

+----------+-------------------+------------------+
|prediction|       Moy_Latitude|     Moy_Longitude|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



### 9- Exportation de la data frame fitted après élimination de la colonne  features, dans le répertoire path-to-output-data

In [15]:
fitted = fitted.drop('features')
fitted = fitted.write.format("csv").mode("overwrite")\
        .save(path_to_output_data, header = 'true')