In [1]:
import findspark
import pandas as pd
findspark.init("/opt/spark-2.4.0")
from pyspark.sql.functions import *
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import FloatType

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("example_sdx") \
    .getOrCreate()

In [3]:
spark

In [4]:
brisbane = spark.read.option("multiline", "true").json("/Brisbane_CityBike.json")

In [5]:
brisbane.printSchema()

root
 |-- address: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- id: double (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- name: string (nullable = true)
 |-- position: string (nullable = true)



In [6]:
brisbane.count()

150

In [7]:
brisbane.where(brisbane.id.isin([19,12])).show()

+--------------------+--------------------+----+----------+---------+--------------------+--------+
|             address|         coordinates|  id|  latitude|longitude|                name|position|
+--------------------+--------------------+----+----------+---------+--------------------+--------+
|Albert St Mall / ...|                null|12.0|-27.468963|153.02461|12 - ALBERT ST MA...|    null|
|Gardens Point Rd ...|[-27.479004, 153....|19.0|      null|     null|19 - GARDENS POIN...|    null|
+--------------------+--------------------+----+----------+---------+--------------------+--------+



In [8]:
brisbane=brisbane.withColumn("latitude", when(brisbane.latitude.isNull(),brisbane.coordinates.latitude) \
    .otherwise(brisbane.latitude)) \
    .withColumn("longitude", when(brisbane.longitude.isNull(),brisbane.coordinates.longitude) \
    .otherwise(brisbane.longitude))
            

In [9]:
brisbane.where(brisbane.id.isin([19,12])).show()

+--------------------+--------------------+----+----------+----------+--------------------+--------+
|             address|         coordinates|  id|  latitude| longitude|                name|position|
+--------------------+--------------------+----+----------+----------+--------------------+--------+
|Albert St Mall / ...|                null|12.0|-27.468963| 153.02461|12 - ALBERT ST MA...|    null|
|Gardens Point Rd ...|[-27.479004, 153....|19.0|-27.479004|153.028853|19 - GARDENS POIN...|    null|
+--------------------+--------------------+----+----------+----------+--------------------+--------+



In [10]:
coords_subset=brisbane.where(brisbane.latitude.isNotNull()).where(brisbane.longitude.isNotNull()).select(brisbane.id,brisbane.latitude,brisbane.longitude)

In [11]:
#coords_subset.show(3,truncate=False)
coords_subset.count()

148

In [37]:
coords_subset.coalesce(1).write.csv('/coords_subset',header = True)


In [13]:
coords_subset_2 = spark.read.csv('/coords_subset/*.csv',header=True).mode("overwrite")

In [14]:
coords_subset_2.show(1)

+-----+----------+----------+
|   id|  latitude| longitude|
+-----+----------+----------+
|122.0|-27.482279|153.028723|
+-----+----------+----------+
only showing top 1 row



In [15]:
coords_subset_2=coords_subset_2.withColumn("latitude", coords_subset_2.latitude.cast(FloatType()))
coords_subset_2=coords_subset_2.withColumn("longitude", coords_subset_2.longitude.cast(FloatType()))
coords_subset_2=coords_subset_2.withColumn("id", coords_subset_2.id.cast(FloatType()))
coords_subset_2=coords_subset_2.where(coords_subset_2.latitude.isNotNull()).where(coords_subset_2.longitude.isNotNull())

In [16]:
vecAssembler = VectorAssembler(inputCols=["latitude", "longitude"], outputCol="features")
data_with_features = vecAssembler.transform(coords_subset_2)
data_with_features.show(3)

+-----+----------+---------+--------------------+
|   id|  latitude|longitude|            features|
+-----+----------+---------+--------------------+
|122.0|-27.482279|153.02872|[-27.482278823852...|
| 91.0| -27.47059|153.03604|[-27.470590591430...|
| 75.0|-27.461882|153.04698|[-27.461881637573...|
+-----+----------+---------+--------------------+
only showing top 3 rows



In [17]:
kmeans = KMeans(k=5,seed=1)  # 2 clusters KMeans(k=2, maxIter=10, seed=1)
model = kmeans.fit(data_with_features.select('features'))
#kmeans.train

In [18]:
transformed = model.transform(data_with_features)
transformed.show(10) 

+-----+----------+---------+--------------------+----------+
|   id|  latitude|longitude|            features|prediction|
+-----+----------+---------+--------------------+----------+
|122.0|-27.482279|153.02872|[-27.482278823852...|         1|
| 91.0| -27.47059|153.03604|[-27.470590591430...|         4|
| 75.0|-27.461882|153.04698|[-27.461881637573...|         2|
| 99.0|-27.469658| 153.0167|[-27.469657897949...|         1|
|109.0| -27.48172|153.00436|[-27.481719970703...|         3|
|149.0|-27.493626|153.00148|[-27.493625640869...|         3|
|139.0|-27.476076|153.00246|[-27.476076126098...|         3|
| 24.0|-27.493963|153.01193|[-27.493963241577...|         0|
|117.0|-27.482197|153.02089|[-27.482196807861...|         1|
| 73.0|-27.465225|153.05086|[-27.465225219726...|         4|
+-----+----------+---------+--------------------+----------+
only showing top 10 rows



In [None]:
transformed.coalesce(1).write.format("json").save("/transformed")

In [29]:
pd_data = transformed.select(transformed.id,transformed.latitude,transformed.longitude,transformed.prediction).toPandas().set_index('id')

In [36]:
pd_data.head(20)

Unnamed: 0_level_0,latitude,longitude,prediction
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
122.0,-27.482279,153.028717,1
91.0,-27.470591,153.036041,4
75.0,-27.461882,153.046982,2
99.0,-27.469658,153.016693,1
109.0,-27.48172,153.004364,3
149.0,-27.493626,153.00148,3
139.0,-27.476076,153.002457,3
24.0,-27.493963,153.011932,0
117.0,-27.482197,153.020889,1
73.0,-27.465225,153.050858,4
