In [1]:
import findspark
findspark.init("C:/spark")
import folium  
#from folium.pulgins import MarkerCluster

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

from pyspark.sql import SparkSession, functions as F

#### 1. Instancier le client spark

In [2]:
# session sparkContext
spark = SparkSession.builder.master("local").appName("../data/Brisbane_city_bike").getOrCreate()

In [10]:
import configparser
config = configparser.ConfigParser()

#### 2- Créer un fichier properties.conf contenant les informations relatives à vos paramètres du programme en dur.

In [11]:
config.write(open('../src/propreties.config','w'))

In [12]:
config.add_section('Bristol-City-bike')
config.set('Bristol-City-bike','Input-data','../data/Bristol-city-bike.json')
config.set('Bristol-City-bike','Output-data','../exported/')
config.set('Bristol-City-bike','Kmeans-level','3')
config.write(open('../src/propreties.config','w'))

In [13]:
config.read('../src/propreties.config')

['../src/propreties.config']

In [14]:
print(config.items('Bristol-City-bike'))

[('input-data', '../data/Bristol-city-bike.json'), ('output-data', '../exported/'), ('kmeans-level', '3')]


In [15]:
print(config.sections())

['Bristol-City-bike']


In [16]:
# Print all contents. Also save into a dictionary
configuration = {}
for section in config.sections():
    print("Section [%s]" % section)
    for option in config.options(section):
        print("|%s|%s|" % (option,
                config.get(section, option)))          # Print
        configuration[option] = config.get(section, option) # Save in dict

print(configuration)

Section [Bristol-City-bike]
|input-data|../data/Bristol-city-bike.json|
|output-data|../exported/|
|kmeans-level|3|
{'input-data': '../data/Bristol-city-bike.json', 'output-data': '../exported/', 'kmeans-level': '3'}


In [17]:
path_to_input_data= configuration['input-data']
path_to_output_data= configuration['output-data']
num_partition_kmeans = int(configuration['kmeans-level'])

#### 3 -Importer le json avec spark :  bristol = spark.read.json. en utilisant la variable path-to-input-data

In [18]:
Brisbane_city_bike = spark.read.option("header", True).json(path_to_input_data)

Brisbane_city_bike.show(5)


+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
+--------------------+----------+----------+--------------------+------+
only showing top 5 rows



In [19]:
# Analyse déscriptive
Brisbane_city_bike.drop("address","name","name","number").summary().show()

+-------+-------------------+--------------------+
|summary|           latitude|           longitude|
+-------+-------------------+--------------------+
|  count|                149|                 149|
|   mean| -27.47130457718122|  153.02508301342277|
| stddev|0.01089151504934133|0.015056701432027432|
|    min|         -27.499963|          152.990627|
|    25%|         -27.478653|           153.01476|
|    50%|          -27.47011|          153.026861|
|    75%|         -27.464681|          153.035533|
|    max|         -27.448074|          153.053645|
+-------+-------------------+--------------------+



#### 4 -créer un nouveau data frame Kmeans-df contenant seulement les variables latitude et longitude.

In [20]:
kmeans_df=Brisbane_city_bike[["latitude","longitude"]]
kmeans_df.show()

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
| -27.48172| 153.00436|
|-27.493626|153.001482|
|-27.476076|153.002459|
|-27.493963|153.011938|
|-27.482197|153.020894|
|-27.465226|153.050864|
|-27.468447|153.024662|
|-27.473021|153.025988|
|-27.457825|153.036866|
| -27.48148| 153.02368|
|-27.467464|153.022094|
|-27.499963|153.017633|
|-27.490776|152.994747|
|-27.458199|153.041688|
|-27.481808|153.025477|
+----------+----------+
only showing top 20 rows



#### 5.  k- means

In [21]:
features = ('longitude','latitude')
kmeans = KMeans().setK(num_partition_kmeans).setSeed(1)
assembler = VectorAssembler(inputCols=features,outputCol="features")
dataset=assembler.transform(kmeans_df)
model = kmeans.fit(dataset)
fitted = model.transform(dataset)


#### 6- quels sont les noms des colonnes de fitted ? vérifier qu’il s’agit de longitude, latitude, features, predictions. 

In [22]:
#fitted.columns
fitted.show()

+----------+----------+--------------------+----------+
|  latitude| longitude|            features|prediction|
+----------+----------+--------------------+----------+
|-27.482279|153.028723|[153.028723,-27.4...|         2|
| -27.47059|153.036046|[153.036046,-27.4...|         2|
|-27.474531|153.042728|[153.042728,-27.4...|         1|
|-27.461881|153.046986|[153.046986,-27.4...|         1|
|-27.469658|153.016696|[153.016696,-27.4...|         2|
| -27.48172| 153.00436|[153.00436,-27.48...|         0|
|-27.493626|153.001482|[153.001482,-27.4...|         0|
|-27.476076|153.002459|[153.002459,-27.4...|         0|
|-27.493963|153.011938|[153.011938,-27.4...|         0|
|-27.482197|153.020894|[153.020894,-27.4...|         2|
|-27.465226|153.050864|[153.050864,-27.4...|         1|
|-27.468447|153.024662|[153.024662,-27.4...|         2|
|-27.473021|153.025988|[153.025988,-27.4...|         2|
|-27.457825|153.036866|[153.036866,-27.4...|         1|
| -27.48148| 153.02368|[153.02368,-27.48...|    

#### 7- Déterminer les longitudes et latitudes moyennes pour chaque groupe en utilisant spark DSL et SQL. comparer les résultats

In [27]:
# DSL
# latitude moyenne
fitted.groupBy('prediction').agg(F.mean("latitude").alias("mean_lati")).show()

+----------+-------------------+
|prediction|          mean_lati|
+----------+-------------------+
|         1|-27.460240636363633|
|         2| -27.47255990624999|
|         0|-27.481218536585374|
+----------+-------------------+



In [28]:
# SQL
# latitude moyenne
fitted.createOrReplaceTempView("fittedSQL")
spark.sql("""
select prediction , mean(latitude) as mean_lati from fittedSQL 
group by prediction

""").show()


+----------+-------------------+
|prediction|          mean_lati|
+----------+-------------------+
|         1|-27.460240636363633|
|         2| -27.47255990624999|
|         0|-27.481218536585374|
+----------+-------------------+



In [25]:
# DSL
# longitude moyenne
fitted.groupBy('prediction').agg(F.mean("longitude").alias("mean_longi")).show()

+----------+------------------+
|prediction|        mean_longi|
+----------+------------------+
|         1|153.04186302272726|
|         2|   153.02594553125|
|         0|153.00572882926832|
+----------+------------------+



In [26]:
# SQL
# longitude moyenne
spark.sql("""
select prediction , mean(longitude) as mean_longi from fittedSQL 
group by prediction

""").show()


+----------+------------------+
|prediction|        mean_longi|
+----------+------------------+
|         1|153.04186302272726|
|         2|   153.02594553125|
|         0|153.00572882926832|
+----------+------------------+



#### 8-Bonus:  Faire une visualisation dans une map avec le package leaflet (correction) (La ville est en Australie et s’appelle BRISBANE et non pas BRISTOLE)

###### Pour la visualisation de l'emplacement des vélos on a calculé la moyenne de chaque classe puis réprésenter ces moyennes, 
###### donc chaque vélo réprésente la moyenne de chaque cluster

In [29]:
Brisbane_coords=[-27.46897,153.02350]
c1_coords=[-27.460240636363633,153.04186302272726]
c2_coords=[-27.47255990624999,153.02594553125]
c3_coords=[-27.481218536585374,153.00572882926832]

my_map=folium.Map(location=Brisbane_coords,zoom_start=13)


iconvelo=folium.features.CustomIcon('../images/velo.jpg',icon_size=(50,30))
iconvelo2=folium.features.CustomIcon('../images/velo2.jpg',icon_size=(50,30))
iconvelo3=folium.features.CustomIcon('../images/velo3.jpg',icon_size=(50,30))

folium.Marker(c1_coords,popup='classe1', icon=iconvelo).add_to(my_map)
folium.Marker(c2_coords,popup='classe2', icon=iconvelo2).add_to(my_map)
folium.Marker(c3_coords,popup='classe3', icon=iconvelo3).add_to(my_map)

my_map

#### 9- Exporter la data frame fitted après élimination de la colonne  features, dans le répertoire 

In [30]:

fitted.drop("features").toPandas().to_csv(path_to_output_data +'fitted.txt')
  

In [23]:
#### Fermeture de l'instanciation de Spark Session
spark.stop()