# **K-Means using Spark**

### **Dataset - Breast Cancer Dataset**

## **Installing Packages**

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 50.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=e644a2076c383a8bb1710e35b06481bcea8e55c6abdcbc67c90468e3d8b225eb
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


**PySpark** is the Python API(application programming interface)for Apache Spark, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing.

1.  **Apache Spark** is basically a computational engine that works with huge sets of data by processing them in parallel and batch systems. Spark is written in Scala, and PySpark was released to support the collaboration of Spark and Python. 

2.  you can install just a **PySpark package by using the pip python installer.**





**Installing PyDrive**

In [None]:
!pip install -U -q PyDrive

1. PyDrive is a high-level Python wrapper for the Google Drive API. It allows you to easily upload, download, and delete files in your Google Drive from a Python script. 


**Installing Java Developer Kit 8**

In [None]:
!apt install openjdk-8-jdk-headless -qq

The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 22 not upgraded.
Need to get 36.6 MB of archives.
After this operation, 143 MB of additional disk space will be used.
Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database ... 123941 files and directories currently installed.)
Preparing to unpack .../openjdk-8-jre-headless_8u342-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u342-b07-0ubuntu1~18.04) ...
Selecting previously unselected package openjdk-8-



1. Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.


2. **OpenJDK (Open Java Development Kit)** is a free and open-source implementation of
the Java Platform. it delivers the source code used to provide users with a wide choice of distributions on which to run their applications. 


3. **headless** are software  capable of working on a device without a graphical user interface. Such software receives inputs and provides output through other interfaces like network or serial port and is common on servers and embedded devices.



**Importing OS and setting JAVA environment**

In [None]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

 

1. Python os system function allows us to run a command in the Python script.(**os — Miscellaneous operating system interfaces)**

2. Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.



## **Importing PySpark, SparkContext & SparkConf**

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf



1.   Import PySpark in Python Using findspark,path at runtime so that you can import PySpark modules.
2.  **from pyspark.sql import *--->**PySpark SQL is a module in Spark which integrates relational processing with Spark's functional programming API. We can extract the data by using an SQL query language. We can use the queries same as the SQL

3. **from pyspark.sql.types import *--->** It represents a list of available data types.

4. **from pyspark.sql.functions import *-->** It represents a list of built-in functions available for DataFrame.

5. **from pyspark import SparkContext, SparkConf --->** It is used to set a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession's configuration.Returns the underlying SparkContext.


## **Creating Session & Context**

In [None]:
# create the session 
conf = SparkConf().set("spark.ui.port","4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
spark



1. A **SparkSession** can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files. To create a SparkSession.

2. Now, we can import SparkSession from **pyspark.sql** and create a SparkSession, which is the entry point to Spark.

3. You can give a name to the session using **appName()** and add some configurations with **config()** if you wish.

4. **builder.getOrCreate()**
Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder. This method first checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default.

5. Finally, print the SparkSession variable.




##**Downloading ngork**

In [15]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip


--2022-10-20 17:53:09--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 52.202.168.65, 54.161.241.46, 54.237.133.81, ...
Connecting to bin.equinox.io (bin.equinox.io)|52.202.168.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2022-10-20 17:53:10 (40.1 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]



1. wget is a convenient and widely supported tool for downloading files over three protocols: HTTP, HTTPS, and FTP. Wget owes its popularity to two of its main features: recursiveness and robustness. <br>
Here we download ngork for linux 64 bit. ngork is the fastest way to put anything on the internet with a single command.

## **Unzipping ngork**

In [16]:
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')


Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   


1. In the previous cell, we have downloaded the ngork as zip format. Here, we use unzip command to extract the data from the zipped file.

## **Creating Spark UI and accessing it using through link**

In [19]:
get_ipython().system_raw('./ngrok http 4050 &')

1. We have configured the Spark UI and started a Spark session in the previous cells. Here, this command is used to create a Colab application UI on the Apache Spark website under the Jobs tab.

In [17]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

https://123a-34-133-227-143.ngrok.io


1. The above code creates and throws out a public URL for the UI page to view the Spark UI. Now we can view the jobs and their stages at the link created.

## **Importing required Libraries**

**Data Processing**

In [None]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

1. We are importing dataset from sklearn module.

In [None]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod
df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

1. For convenience, given that the dataset is small, I will first construct a Pandas dataframe, tune the schema, and then convert it into a Spark datafram

2. Converting the data to a pandas DataFrame

3. Then converting this data frame to Spark Data Frame using createDataFrame method.

4. Now data.schema contains three parts which are: **name,StringType,nullable (Which is set to true by default).**

Now we create a function **'set_df_columns_nullable'** which converts the nullable into **False**, Thus, no** none values** are allowed in each of the columns. Next our function first converts the dataframe to a RDD (Using .rdd) and then returns a DataFrame  

5. Now using the method .withcolumn we add another column (called features) to our data frame in which each row contains a list consisting of values of the other columns

In [20]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row),["features"])
labels = pd.Series(breast_cancer.target)

1. Dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users can pass in SciPy’s scipy.sparse column vectors.

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# trains a K-means model
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)

# Make Predictions
prediction = model.transform(features)

# Evaluate clustering by computing Silhoutte Score
evaluator = ClusteringEvaluator()

silhoutte = evaluator.evaluate(prediction)
print('Silhoutte with squared eculidean distance = '+ str(silhoutte))

Silhoutte with squared eculidean distance = 0.8342904262826145


1. **from pyspark.ml.clustering import KMeans-->**Built on top of Spark, MLlib is a scalable machine learning library that provides a uniform set of high-level APIs that help users create and tune practical machine learning pipelines.

2. **from pyspark.ml.evaluation import ClusteringEvaluator--->**Evaluator for Clustering results, which expects two input columns: prediction and features. The metric computes the Silhouette measure using the squared Euclidean distance.