# **K-Means using Spark**

#### Setup

`PySpark` is the `Python API` for `Apache Spark`, an open source, distributed computing framework and set of libraries for real-time, large-scale data processing. 

Let's setup Spark on your Colab envrironment.

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 51.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=7b5ef7216e52a3f0747dc6e2d1e93c412b4c47fa414b2035d3324e5158337b6c
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


PyDrive is a high-level Python wrapper for the ```Google Drive API```. It allows you to easily upload, download, and delete files in your Google Drive from a Python script.

PyDrive is a simple yet powerful way for integrating Google Drive into your applications. It opens a lot of doors for all sorts of cool projects.

In [3]:
!pip install -U -q PyDrive 
# -q, --quiet option reduces the output produced by pip. It does not affect the installation process. It is a general option.
 # -q means display only the messages with WARNING,ERROR,CRITICAL log levels
# -U, --upgrade Upgrade all packages to the newest available version.

**OpenJDK Development Kit (JDK) (headless)** is a run time environment

The headless option is a minimal runtime environment, without a graphical interface, more suitable for server applications. It uses minimal system resources and doesn’t include keyboard or mouse support.

OpenJDK is a development environment for building applications, applets, and components using the Java programming language.

Install **OpenJDK 11 headless** by entering the following:

In [4]:
!apt install openjdk-8-jdk-headless -qq
# apt package management system is distinguished by excellent support for dependency management and tracking. This means that apt will try to resolve dependencies, searching for the needed packages amongst all of the repositories it has been told about.
# -qq  means display only the messages with ERROR,CRITICAL log levels

The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 27 not upgraded.
Need to get 36.6 MB of archives.
After this operation, 143 MB of additional disk space will be used.
Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database ... 123942 files and directories currently installed.)
Preparing to unpack .../openjdk-8-jre-headless_8u342-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u342-b07-0ubuntu1~18.04) ...
Selecting previously unselected package openjdk-8-

The **OS module** in Python provides functions for interacting with the operating system. This module provides a portable way of using operating system-dependent functionality.

The *os* module include many functions to interact with the file system.

**os.environ** in Python is a mapping object that represents the user’s environmental variables. It returns a dictionary having user’s environmental variable as key and their values as value.

        Syntax: os.environ

        Parameter: It is a non-callable object. Hence, no parameter is required

        Return Type: This returns a dictionary representing the user’s environmental variables

**JAVA_HOME** is an operating system (OS) environment variable which can optionally be set after either the *`Java Development Kit (JDK)`* or the *`Java Runtime Environment (JRE)`* is installed. The `JAVA_HOME` environment variable points to the file system location where the `JDK` or `JRE` was installed.

Other programs installed on a desktop computer that require a Java runtime will query the OS for the **JAVA_HOME** variable to find out where the runtime is installed

In [5]:
import os
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-8-openjdk-amd64'

In [6]:
# importing necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark # importing pyspark after installed
# import * as opposed to other forms of import is that it imports everything from the designated module under the current module.
from pyspark.sql import * # PySpark SQL is one of the most used PySpark modules which is used for processing structured columnar data format.
from pyspark.sql.types import * # importing all(*) list of data types available using pyspark.sql.types
from pyspark.sql.functions import * # It represents a list of built-in functions available for DataFrame.
from pyspark import SparkContext, SparkConf # 'SparkContext' is the entry point to any functionality.
                                            # 'SparkConf' represents a list of built-in functions available for DataFrame.

**SparkContext** is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes.

A **SparkContext** represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

Only one **SparkContext** should be active per **JVM**. You must `stop()` the active `SparkContext` before creating a new one.

**SparkConf** creates Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.

`SparkConf()`, loads values from spark.* Java system properties as well. In this case, any parameters you set directly on the SparkConf object take priority over system properties.

    Every SparkContext launches a 'Web UI', by "default on port 4040", that displays useful information about the application. This includes:

        A list of scheduler stages and tasks
        A summary of RDD sizes and memory usage
        Environmental information.
        Information about the running executors

    If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).

In [7]:
# create the session
conf = SparkConf().set("spark.ui.port","4050") # configured spark application by setting Web UI on port "4050"

# create the context
sc = pyspark.SparkContext(conf = conf) # creating an entry point to previously created spark application, 'conf'.
spark = SparkSession.builder.getOrCreate() # Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if you Colab is running with a **local runtime**).

In [8]:
spark

In [10]:
# 'wget' is a networking command-line tool that lets you download files and interact with REST APIs.
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip # ngrok is the fastest way to put anything on the internet, getting zip file

--2022-10-20 19:10:21--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.161.241.46, 52.202.168.65, 54.237.133.81, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.161.241.46|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip.1’


2022-10-20 19:10:22 (60.5 MB/s) - ‘ngrok-stable-linux-amd64.zip.1’ saved [13832437/13832437]



In [11]:
!unzip ngrok-stable-linux-amd64.zip # Unzipping the ngrok 
get_ipython().system_raw('./ngrok http 4050 &') # get_ipython() - Get the global InteractiveShell instance
# The get_ipython() command allows one to access IPython commands, and system_raw() executes commands in the native command prompt / terminal.
# system_raw - starts the ngrok tunnel via the http protocol to port 4050

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   


In [12]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \"import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

/bin/bash: -c: line 0: syntax error near unexpected token `json.load'
/bin/bash: -c: line 0: `curl -s http://localhost:4040/api/tunnels | python3 -c \"import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"'


The curl command in Linux is used to run http requests. In this case, a request is being made to “http://localhost:4040/api/tunnels”. 
This is an `ngrok API` running locally that contains information about the tunnels that are operating.

The information received from that curl http request is then sent to the local Python 3 application via the Linux pipe “|” operator. The results come into Python via sys.stdin in json format – and the public URL of the tunnel that has been created is printed to screen. 

**Data Pre-processing**

In this colab, rather than downloading a file from Google Drive, we will load a famous machine learning dataset, the Breast Cancer Wisconsin dataset, using the scikit-learn datasets loader.

In [13]:
from sklearn.datasets import load_breast_cancer # importing python in-built dataset from scikit-learn
breast_cancer = load_breast_cancer() # loading imported data

        * Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
          
            # SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)[source]
        
        * Using default schema=None, it will try to infer the schema (column names and types) from data, 
        which should be an RDD of either Row, namedtuple, or dict.
        
        * If schema inference is needed, samplingRatio is used to determined the ratio of rows used for schema inference. 
        The first row will be used if samplingRatio is None.

In [31]:
pd_df  = pd.DataFrame(breast_cancer.data, columns = breast_cancer.feature_names) # creating data frame of independent features
df = spark.createDataFrame(pd_df) 
df

DataFrame[mean radius: double, mean texture: double, mean perimeter: double, mean area: double, mean smoothness: double, mean compactness: double, mean concavity: double, mean concave points: double, mean symmetry: double, mean fractal dimension: double, radius error: double, texture error: double, perimeter error: double, area error: double, smoothness error: double, compactness error: double, concavity error: double, concave points error: double, symmetry error: double, fractal dimension error: double, worst radius: double, worst texture: double, worst perimeter: double, worst area: double, worst smoothness: double, worst compactness: double, worst concavity: double, worst concave points: double, worst symmetry: double, worst fractal dimension: double]

The **schema()** method returns a StructType object:

In [15]:
df.schema

StructType([StructField('mean radius', DoubleType(), True), StructField('mean texture', DoubleType(), True), StructField('mean perimeter', DoubleType(), True), StructField('mean area', DoubleType(), True), StructField('mean smoothness', DoubleType(), True), StructField('mean compactness', DoubleType(), True), StructField('mean concavity', DoubleType(), True), StructField('mean concave points', DoubleType(), True), StructField('mean symmetry', DoubleType(), True), StructField('mean fractal dimension', DoubleType(), True), StructField('radius error', DoubleType(), True), StructField('texture error', DoubleType(), True), StructField('perimeter error', DoubleType(), True), StructField('area error', DoubleType(), True), StructField('smoothness error', DoubleType(), True), StructField('compactness error', DoubleType(), True), StructField('concavity error', DoubleType(), True), StructField('concave points error', DoubleType(), True), StructField('symmetry error', DoubleType(), True), StructFi

**StructField**

`StructFields` model each column in a DataFrame.

`StructField` objects are created with the name, dataType, and nullable properties. 

**DoubleType()** can count double in Python as float values are specified with a decimal point. All platforms represent Python float values as 64-bit “double-precision” values

When the **nullable** field is set to `true`, the column can accept null values.

For convenience, given that the dataset is small, we first construct a Pandas dataframe, tune the schema, and then convert it into a Spark dataframe.

In [32]:
def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema) # creating data frame of distributed data and schema
    return df_mod

In [33]:
df = set_df_columns_nullable(spark, df, df.columns) # calling defined function

In [34]:
df

DataFrame[mean radius: double, mean texture: double, mean perimeter: double, mean area: double, mean smoothness: double, mean compactness: double, mean concavity: double, mean concave points: double, mean symmetry: double, mean fractal dimension: double, radius error: double, texture error: double, perimeter error: double, area error: double, smoothness error: double, compactness error: double, concavity error: double, concave points error: double, symmetry error: double, fractal dimension error: double, worst radius: double, worst texture: double, worst perimeter: double, worst area: double, worst smoothness: double, worst compactness: double, worst concavity: double, worst concave points: double, worst symmetry: double, worst fractal dimension: double]

In [35]:
df = df.withColumn('features', array(df.columns)) # this snippet updates the value of columns as an array to features
df

DataFrame[mean radius: double, mean texture: double, mean perimeter: double, mean area: double, mean smoothness: double, mean compactness: double, mean concavity: double, mean concave points: double, mean symmetry: double, mean fractal dimension: double, radius error: double, texture error: double, perimeter error: double, area error: double, smoothness error: double, compactness error: double, concavity error: double, concave points error: double, symmetry error: double, fractal dimension error: double, worst radius: double, worst texture: double, worst perimeter: double, worst area: double, worst smoothness: double, worst compactness: double, worst concavity: double, worst concave points: double, worst symmetry: double, worst fractal dimension: double, features: array<double>]

In [38]:
vectors = df.rdd.map(lambda row: Vectors.dense(row.features)) # map() transformation on every element of PySpark RDD and learned it returns the same number of elements as input RDD.
print(vectors)
df.printSchema() # printSchema() method to illustrate that 'features' is a 'struct' column:

PythonRDD[119] at RDD at PythonRDD.scala:53
root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable 

With the next cell, we build the two datastructures that we will be using throughout this Colab:

* `features`, a dataframe of Dense vectors, containing all the original features in the dataset;

* `labels`, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.

class **pyspark.ml.linalg.Vectors**

   * Factory methods for working with vectors.

In [39]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(breast_cancer.target)

### Your task

If you run successfully the Setup and Data Preprocessing stages, you are now ready to cluster the data with the K-means algorithm included in MLlib (Spark's Machine Learning library). Set the `k` parameter to **2**, use a seed of **1**, fit the model, and the compute the Silhouette score (i.e. a measure of quality of the obtained clustering).

**IMPORTANT:** use the MLlib implementation of the Silhouette score (via `ClusteringEvaluator`).

In [40]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)

# Make predictions
predictions = model.transform(features)

#Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.8342904262826145
