## KMeans Using Spark

### SetUp of Spark On collab Environment

In [1]:
#Installing pyspark
!pip install pyspark

#Installing the PyDrive that connect Pyspark with our Drive
!pip install -U -q PyDrive

# Headless software (e.g. "headless Java" or "headless Linux",) is software capable of working on a device without a graphical user interface. 
#Such software receives inputs and provides output through other interfaces like network or serial port and is common on servers and embedded devices.
#Installing Java 8
#Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.
!apt install openjdk-8-jdk-headless -qq

#JAVA_HOME is an operating system (OS) environment variable which can optionally be set after either the Java Development Kit (JDK) or the Java Runtime Environment (JRE) is installed.
#Setting up Environment variale Java_Home. The JAVA_HOME environment variable indicates the directory where the JDK software is installed.
#his will enable us to run Pyspark in the Colab environment.
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 47 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=f02572bfaa8fdb6afa60749186c59b3d1084056bf75c26dad00b19b3d4fcb8bd
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove

## Importing necessary libraries

In [2]:
#Importing pandas for dataframes and data manipulation. Importing numpy for numerical computations and arrays
import pandas as pd
import numpy as np
#visuaisations libraries
import matplotlib.pyplot as plt
%matplotlib inline


#Importing pyspark which is the python API for Apache Spark, containing the framework for large scale data processing
import pyspark

#Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.
#Importing PySpark SQL, which is a module in spark that integrates relational processing with Spark's functional programming API
from pyspark.sql import *
#Import a class of pyspark sql called types which provides the list of datatypes available
from pyspark.sql.types import *

# importing pyspark sql class functions which provides the list of built in functions avilable for dataframes
from pyspark.sql.functions import *

#importing SparkContext and SparkConf
from pyspark import SparkContext,SparkConf

**Sparkcontext** is the entry point for spark environment. For every sparkapp we need to create the sparkcontext object.A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Most importantly, only one SparkContext should be active per JVM.

The **SparkConf()** sets a configuration option. Options set using this method are automatically propagated to SparkConf's and SparkSession's own configuration.


In [3]:
#create the spark session
#The entry point to programming Spark with the Dataset and DataFrame API.
#A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.
conf = SparkConf().set("spark.ui.port","4050")
#create the context
sc=pyspark.SparkContext(conf=conf)

#Builder:A class attribute having a Builder to construct SparkSession instances.
#GetOrCreate(): Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.
spark=SparkSession.builder.getOrCreate()

In [4]:
#Printing the spark session variable and checking the version
spark

In [5]:
#If we want to view the Spark UI, you would have to include a few more lines of code to create a public URL for the UI page.
#In the Spark UI, we can monitor the progress of the job and debug the performance bottlenecks (if my Colab is running with a local runtime).
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2022-10-20 16:33:47--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 52.202.168.65, 18.205.222.128, 54.237.133.81, ...
Connecting to bin.equinox.io (bin.equinox.io)|52.202.168.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2022-10-20 16:33:48 (17.3 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


#Loading the dataset for further exploration

In [6]:
#Loading the breast cancer dataset from scikit learn
from sklearn.datasets import load_breast_cancer

#creating an instance of breast cancer
breast_cancer=load_breast_cancer()

In [7]:
#creating a pandas dataframe with the breast cancer data and taking the feature names as columns of the dataframe
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)

#creating a spark dataframe
df = spark.createDataFrame(pd_df)

In [8]:
#With the help of pyspark creating a schema from the dataset
def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod
df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

#printing the schema
df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

In [9]:
## Importing the linear algebra package from the the Machine Learning module in pyspark
from pyspark.ml.linalg import Vectors

#Creating a dataframe in Pyspark and .map is used for  lopping or iterating through rows in the dataframe
features = spark.createDataFrame(vectors.map(Row), ["features"])

#creating a pandas series for target key in the cancer dataset
labels = pd.Series (breast_cancer.target)

In [10]:
#Importing the Kmeans algorithm from pyspark machine learning module
from pyspark.ml.clustering import KMeans
#importing the evaluation metrics
from pyspark.ml.evaluation import ClusteringEvaluator



# Training a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)



#Make predictions
predictions = model.transform(features)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()


#Calculating the silhouette score
silhouette = evaluator.evaluate(predictions)

print("Silhoutte with squared euclidean distance = " + str(silhouette))

Silhoutte with squared euclidean distance = 0.8342904262826145
