# Importation des packages 

In [1]:
import findspark
findspark.init("C:/spark")
import pyspark 
findspark.find()

'C:/spark'

In [2]:
from pyspark.sql import SparkSession
import configparser
from pyspark.sql.functions import *
import folium
import pandas as pd


## 1- Instancier Client Spark session

In [3]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Bristol-City-bike") \
    .getOrCreate()

# 2- Récupération des path

In [4]:
config = configparser.ConfigParser()
config.read('properties.conf')

['properties.conf']

In [5]:
path_to_input_data = config['Bristol-City-bike']['Input-data']
path_to_output_data = config['Bristol-City-bike']['Output-data']
num_partition_kmeans = int(config['Bristol-City-bike']['Kmeans-level'])

# 3- importation des données 

In [6]:
bristol = spark.read.json(path_to_input_data)
bristol.show(4)

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
+--------------------+----------+----------+--------------------+------+
only showing top 4 rows



# 4- Nouveau dataframe

In [7]:
Kmeans_df = bristol.select(col("latitude"),col("longitude"))
Kmeans_df.show(4)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
+----------+----------+
only showing top 4 rows



# 5- Clustering à l'aide des Kmeans

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
features = ("longitude","latitude")
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)
assembler = VectorAssembler(inputCols=features,outputCol="features")
dataset = assembler.transform(Kmeans_df)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)

# 6- les noms des colonnes de fitted

In [9]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

# 7- la moyenne de latitude et longitude en fonction du cluster

In [10]:
#SPARK SQL
fitted.createOrReplaceTempView("fittedSQL")
spark.sql("""
select mean(latitude) as moy_latitude, mean(longitude) as moy_longitude, prediction
from fittedSQL
group By prediction
""").show()

+-------------------+------------------+----------+
|       moy_latitude|     moy_longitude|prediction|
+-------------------+------------------+----------+
|-27.460240636363633|153.04186302272726|         1|
| -27.47255990624999|   153.02594553125|         2|
|-27.481218536585374|153.00572882926832|         0|
+-------------------+------------------+----------+



In [11]:
# SPARK DSL
fitted.groupBy("prediction").agg(mean(col("latitude")).alias('moy_latitude'),mean(col("longitude")).alias('moy_longitude')).show()

+----------+-------------------+------------------+
|prediction|       moy_latitude|     moy_longitude|
+----------+-------------------+------------------+
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
|         0|-27.481218536585374|153.00572882926832|
+----------+-------------------+------------------+



# 8 - Visualisation des résultats

In [12]:
# transformation de fitted en dataframe et calcul des moyennes
fitted_df= fitted.toPandas()

moy_long=fitted_df['longitude'].mean()
moy_lat=fitted_df['latitude'].mean()

# Creation de la carte
Map = folium.Map(location = [moy_lat,moy_long], zoom_start = 13)

def color(row):
    if row['prediction'] == 0:
        color= "blue"
    elif row['prediction'] == 1:
        color= "red"
    else:
        color = "green"
    return color

for index, row in fitted_df.iterrows():
    folium.Marker(location = [row['latitude'], row['longitude']], icon = folium.Icon(color(row))).add_to(Map)

# Affichage de la carte
Map

# 9- Exportation des résultats du clustering

In [13]:
fitted.drop("features")\
      .toPandas().to_csv(path_to_output_data+"resultats.csv")

In [None]:
spark.stop()