# Big Data Assignment for Clustering

- Name : Abidjanna Zulfa Hamdika
- Student ID : 5025201197
- Class : Big Data A

Source : 
- https://medium.com/@josephgeorgelewis2000/end-to-end-pyspark-clustering-part-i-using-colab-for-pyspark-and-collecting-data-6b94b58baeab

- https://medium.com/@josephgeorgelewis2000/end-to-end-pyspark-clustering-part-ii-preprocessing-and-model-building-in-colab-1c2d0d8f2a23

# Setting Up PySpark Environment

In [None]:
# set up Java Dev Kit and download Spark and Hadoop 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz

# unpack Spark and Hadoop
!tar xf spark-3.2.4-bin-hadoop3.2.tgz

# set up home paths for Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

--2023-05-30 14:44:22--  https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 301183180 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.4-bin-hadoop3.2.tgz.2’


2023-05-30 14:44:49 (11.1 MB/s) - ‘spark-3.2.4-bin-hadoop3.2.tgz.2’ saved [301183180/301183180]



In [None]:
# install, import and initialise findspark 
!pip install -q findspark
import findspark
findspark.init()

# set up spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Collecting Data from an API

In [None]:
import requests
import json

In [None]:
# base URL 
url = 'https://api.le-systeme-solaire.net/rest/bodies'

# extra parameters, see full docs for more 
params = {'exclude' :'mass,vol,moons,discoveredBy,discoveryDate,alternativeName,axialTilt,avgTemp,mainAnomaly,argPeriapsis,longAscNode,rel,aroundPlanet,sideralOrbit,sideralRotation,dimension,flattening,polarRadius'}

#´make request and store as JSON, then get the bodies element (the data we need)
all_data = requests.get(url, params).json()
all_data.get('bodies')

# save file with data from the current request so we do not have to make multiple API calls to reset the data

with open('data.json', 'w') as f:
    json.dump(all_data.get('bodies'), f)

In [None]:
from pyspark.sql.types import StructType, StructField, BooleanType, FloatType, StringType

# set up schema for data to be stored with correct data types and nullable fields 
schema = StructType([
  StructField("id", StringType(), False), 
  StructField("englishName", StringType(), True),
  StructField("isPlanet", BooleanType(), True),
  StructField("density", FloatType(), True),
  StructField("gravity", FloatType(), True),
  StructField("escape", FloatType(), True)
])

# read in JSON file with the schema set 
df = spark.read.json('data.json', schema)
df.show(5)

+------+-----------+--------+-------+-------+------+
|    id|englishName|isPlanet|density|gravity|escape|
+------+-----------+--------+-------+-------+------+
|  lune|       Moon|   false|  3.344|   1.62|2380.0|
|phobos|     Phobos|   false|    1.9| 0.0057| 11.39|
|deimos|     Deimos|   false|   1.75|  0.003| 5.556|
|    io|         Io|   false|   3.53|   1.79|   0.0|
|europe|     Europa|   false|   3.01|   1.31|   0.0|
+------+-----------+--------+-------+-------+------+
only showing top 5 rows



# Data Cleaning

In [None]:
from pyspark.sql.functions import when, count, isnull

# alias all columns 
cols = df.columns

# show count of when the column is null for each of those columns
df.select([count(when(isnull(c), c)).alias(c) for c in cols]).show()

+---+-----------+--------+-------+-------+------+
| id|englishName|isPlanet|density|gravity|escape|
+---+-----------+--------+-------+-------+------+
|  0|          0|       0|      0|      0|     0|
+---+-----------+--------+-------+-------+------+



In [None]:
from pyspark.sql.functions import isnan

# check schema to find all numeric cols
df.printSchema()  

# list all numeric cols 
num_cols = ['density', 'gravity', 'escape']

# select a count of all the numeric columns where they are NaN and show
df.select([count(when(isnan(c), c)).alias(c) for c in num_cols]).show()

root
 |-- id: string (nullable = true)
 |-- englishName: string (nullable = true)
 |-- isPlanet: boolean (nullable = true)
 |-- density: float (nullable = true)
 |-- gravity: float (nullable = true)
 |-- escape: float (nullable = true)

+-------+-------+------+
|density|gravity|escape|
+-------+-------+------+
|      0|      0|     0|
+-------+-------+------+



In [None]:
import pandas as pd
import plotly.express as px

# convert the data frame to pandas for visualisation
outliers = df.toPandas()

# plot a scatter plot of all the points to find obvious outliers
scatter = px.scatter(outliers, 'gravity', 'escape', hover_name='englishName', template='ggplot2')
scatter.show()

# filter out Jupiter from PySpark DF as it was a clear outlier
df=df.filter(df.id!='jupiter')

# repeat above process to ensure the removal has worked properly
outliers = df.toPandas()

scatter = px.scatter(outliers, 'gravity', 'escape', hover_name='englishName', template='ggplot2')
scatter.show()

# Model Building

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline

# set up each processing step with the correct input columns and output
assemble=VectorAssembler(inputCols=num_cols, outputCol='features')
scale=StandardScaler(inputCol='features',outputCol='standardized')
km = KMeans(featuresCol = 'standardized')

# assemble the pipeline 
pipe = Pipeline(stages=[assemble, scale, km])

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

# set up evaluator 
evaluator = ClusteringEvaluator()

# test between k=2 and 10 
for k in range(2,10):
  # set the KMeans stage of the pipe to hold each value of K and the random seed = 1 and fit that pipe to data  
  kmeans = pipe.getStages()[-1].setK(k).setSeed(1)  
  model = pipe.fit(df)
  
  # build a preds dataset of each k value
  preds = model.transform(df)

  # silhouette score each prediction set and print formatted output 
  silhouette = evaluator.evaluate(preds)
  print(f'Tested: {k} clusters: {silhouette}')

Tested: 2 clusters: 0.985906445228239
Tested: 3 clusters: 0.9853183536337629
Tested: 4 clusters: 0.9288686572576516
Tested: 5 clusters: 0.9063524257467865
Tested: 6 clusters: 0.8596777095050427
Tested: 7 clusters: 0.8360492437185751
Tested: 8 clusters: -0.7386038523165039
Tested: 9 clusters: 0.8514223592590151


# Evaluation

In [None]:
# set the random seed for the algorithm and the value for k
pipe.getStages()[-1].setK(3).setSeed(1)  

# fit model and transform the data showing a cut of the data to check output
model = pipe.fit(df)
clusters = model.transform(df)
clusters.show(5)

vis_df = clusters.toPandas()

# build figure with 3D numeric dimensions and categorical isPlanet and prediction dimensions
fig = px.scatter_3d(vis_df, x='gravity', y='escape', z='density', color='prediction', symbol='isPlanet', template='ggplot2', hover_name='englishName')
fig.show()

+------+-----------+--------+-------+-------+------+--------------------+--------------------+----------+
|    id|englishName|isPlanet|density|gravity|escape|            features|        standardized|prediction|
+------+-----------+--------+-------+-------+------+--------------------+--------------------+----------+
|  lune|       Moon|   false|  3.344|   1.62|2380.0|[3.34400010108947...|[5.04482713061263...|         0|
|phobos|     Phobos|   false|    1.9| 0.0057| 11.39|[1.89999997615814...|[2.86637892886519...|         0|
|deimos|     Deimos|   false|   1.75|  0.003| 5.556|[1.75,0.003000000...|[2.64008588866244...|         0|
|    io|         Io|   false|   3.53|   1.79|   0.0|[3.52999997138977...|[5.32543034939712...|         0|
|europe|     Europa|   false|   3.01|   1.31|   0.0|[3.00999999046325...|[4.54094771411208...|         0|
+------+-----------+--------+-------+-------+------+--------------------+--------------------+----------+
only showing top 5 rows

