In [40]:
#1- Instancier le client Spark Session.
import findspark
findspark.init("C:/spark")
import pyspark 
findspark.find()

'C:/spark'

In [42]:
#importer des librairies
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import numpy as np
import pandas as pd
import configparser

In [43]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Kmeans") \
                    .getOrCreate()


In [53]:
# 2- Créer un fichier properties.conf contenant les informations relatives à vos paramètres du programme en dur.
fichier = open('properties.conf','w')
fichier.write("[Bristol-City-bike]\n")
fichier.write("Input-data=data/Bristol-city-bike.json\n")
fichier.write("Output-data=exported\n")
fichier.write("Kmeans-level=3")
fichier.close()

In [54]:
config = configparser.ConfigParser()
config.read("properties.conf")


['properties.conf']

In [55]:
path_to_input_data = config['Bristol-City-bike']['Input-data']
path_to_output_data = config['Bristol-City-bike']['Output-data']
num_partition_kmeans = int(config['Bristol-City-bike']['Kmeans-level']) 

In [62]:
# 3- Importer le json avec spark, en utilisant la variable path-to-input-data
brisbane = spark.read.json(path_to_input_data)
brisbane.show()

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
|Montague Rd / Ski...| -27.48172| 153.00436|109 - MONTAGUE RD...|   109|
|Macquarie St / Gu...|-27.493626|153.001482|149 - MACQUARIE S...|   149|
|Bi-centennial Bik...|-27.476076|153.002459|139 - BI-CENTENNI...|   139|
|Sir William McGre...|-27.493963|153.011938|24 - SIR WILLIAM ...|    24|
|Vulture St / Trib...|-27.482197|153.020894|117 - VULTURE ST ...|   117|
|Lamington St / Re...|-27.465226|153.050864|73 - LA

In [63]:
# 4-créer un nouveau data frame Kmeans-df contenant seulement les variables latitude et longitude.
Kmeansdf = brisbane.select("latitude","longitude")
Kmeansdf.show()

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
| -27.48172| 153.00436|
|-27.493626|153.001482|
|-27.476076|153.002459|
|-27.493963|153.011938|
|-27.482197|153.020894|
|-27.465226|153.050864|
|-27.468447|153.024662|
|-27.473021|153.025988|
|-27.457825|153.036866|
| -27.48148| 153.02368|
|-27.467464|153.022094|
|-27.499963|153.017633|
|-27.490776|152.994747|
|-27.458199|153.041688|
|-27.481808|153.025477|
+----------+----------+
only showing top 20 rows



In [64]:
# 5- Kmeans
features = ( "longitude" ,"latitude")
kmeans = KMeans (). setK ( num_partition_kmeans ). setSeed ( 1 )
assembler = VectorAssembler ( inputCols = features , outputCol = "features" )
dataset = assembler . transform ( Kmeansdf )
model = kmeans.fit( dataset )
fitted = model.transform( dataset )

In [65]:
# 6- Les noms des colonnes de fitted.  
fitted.show()
fitted

# vérifier qu'Il s’agit bien de longitude, latitude, features et predictions.
fitted.columns


+----------+----------+--------------------+----------+
|  latitude| longitude|            features|prediction|
+----------+----------+--------------------+----------+
|-27.482279|153.028723|[153.028723,-27.4...|         2|
| -27.47059|153.036046|[153.036046,-27.4...|         2|
|-27.474531|153.042728|[153.042728,-27.4...|         1|
|-27.461881|153.046986|[153.046986,-27.4...|         1|
|-27.469658|153.016696|[153.016696,-27.4...|         2|
| -27.48172| 153.00436|[153.00436,-27.48...|         0|
|-27.493626|153.001482|[153.001482,-27.4...|         0|
|-27.476076|153.002459|[153.002459,-27.4...|         0|
|-27.493963|153.011938|[153.011938,-27.4...|         0|
|-27.482197|153.020894|[153.020894,-27.4...|         2|
|-27.465226|153.050864|[153.050864,-27.4...|         1|
|-27.468447|153.024662|[153.024662,-27.4...|         2|
|-27.473021|153.025988|[153.025988,-27.4...|         2|
|-27.457825|153.036866|[153.036866,-27.4...|         1|
| -27.48148| 153.02368|[153.02368,-27.48...|    

['latitude', 'longitude', 'features', 'prediction']

In [75]:
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [76]:
#DSL
fitted.groupBy("prediction") \
    .agg(mean('latitude')\
    .alias('LatitudeMean'),mean('longitude')\
    .alias('LongitudeMean')\
        )\
    .show()

+----------+-------------------+------------------+
|prediction|       LatitudeMean|     LongitudeMean|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



In [79]:
#SQL
# transformation du dataframe en table
fitted.createOrReplaceTempView("fittedSQL") 
spark.sql("""select prediction,
  mean(latitude) as LatitudeMean,
    mean(longitude) as LongitudeMean
    from fittedSQL
    group by prediction""").show()

+----------+-------------------+------------------+
|prediction|       LatitudeMean|     LongitudeMean|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



In [83]:
# 9- Elimination de la colonne features
NewData=fitted.drop("features")
NewData.show()    

+----------+----------+----------+
|  latitude| longitude|prediction|
+----------+----------+----------+
|-27.482279|153.028723|         2|
| -27.47059|153.036046|         2|
|-27.474531|153.042728|         1|
|-27.461881|153.046986|         1|
|-27.469658|153.016696|         2|
| -27.48172| 153.00436|         0|
|-27.493626|153.001482|         0|
|-27.476076|153.002459|         0|
|-27.493963|153.011938|         0|
|-27.482197|153.020894|         2|
|-27.465226|153.050864|         1|
|-27.468447|153.024662|         2|
|-27.473021|153.025988|         2|
|-27.457825|153.036866|         1|
| -27.48148| 153.02368|         2|
|-27.467464|153.022094|         2|
|-27.499963|153.017633|         0|
|-27.490776|152.994747|         0|
|-27.458199|153.041688|         1|
|-27.481808|153.025477|         2|
+----------+----------+----------+
only showing top 20 rows

