In [1]:
from pyspark.ml.clustering import KMeans 
from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import isnan, when, count, col 
from pyspark.ml.feature import StringIndexer 
import matplotlib.pyplot as plt 
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler 

In [2]:
#get Spark connexion:
spc = SparkContext('local', 'Spark SQL') 
sqlc = SQLContext(spc)

In [3]:
#Get the data
path = "Brisbane_CityBike.json"
sdf = sqlc.read.json(path)

In [4]:
#Checking the state of our data and analysing 
print(sdf.describe().show())
sdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sdf.columns]).show()

+-------+--------------------+-------------------+--------------------+--------------------+-----------------+
|summary|             address|           latitude|           longitude|                name|           number|
+-------+--------------------+-------------------+--------------------+--------------------+-----------------+
|  count|                 149|                149|                 149|                 149|              149|
|   mean|                null| -27.47130457718122|  153.02508301342277|                null|83.19463087248322|
| stddev|                null|0.01089151504934133|0.015056701432027432|                null|94.32001864357125|
|    min|38 - Ann St / Cre...|         -27.499963|          152.990627|1 - EDWARD ST / Q...|                1|
|    max|Wickham St / Murr...|         -27.448074|          153.053645|99 - CORDELIA ST ...|             1101|
+-------+--------------------+-------------------+--------------------+--------------------+-----------------+



In [19]:
# Applying Kmeans 

coordination =['latitude','longitude']
df = sdf.select("latitude", "longitude")
# regroup the coordination in one vector 
assembler = VectorAssembler(inputCols= coordination,outputCol="features")
X = assembler.transform(df)

#Initsializing the the number of clusters into 3
kmeans = KMeans().setK(3).setSeed(1) 


#train the model
model =  kmeans.fit(X)

#results of KMeans.
results = model.transform(X)

#print the results(10 first rows)
print(results.head(10))


[Row(latitude=-27.482279, longitude=153.028723, features=DenseVector([-27.4823, 153.0287]), prediction=2), Row(latitude=-27.47059, longitude=153.036046, features=DenseVector([-27.4706, 153.036]), prediction=2), Row(latitude=-27.474531, longitude=153.042728, features=DenseVector([-27.4745, 153.0427]), prediction=1), Row(latitude=-27.461881, longitude=153.046986, features=DenseVector([-27.4619, 153.047]), prediction=1), Row(latitude=-27.469658, longitude=153.016696, features=DenseVector([-27.4697, 153.0167]), prediction=2), Row(latitude=-27.48172, longitude=153.00436, features=DenseVector([-27.4817, 153.0044]), prediction=0), Row(latitude=-27.493626, longitude=153.001482, features=DenseVector([-27.4936, 153.0015]), prediction=0), Row(latitude=-27.476076, longitude=153.002459, features=DenseVector([-27.4761, 153.0025]), prediction=0), Row(latitude=-27.493963, longitude=153.011938, features=DenseVector([-27.494, 153.0119]), prediction=0), Row(latitude=-27.482197, longitude=153.020894, feat