# Assignment 3(spark)

# **K Means using Spark**

The following code is used to install pyspark to work with big data

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 48 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 51.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=13f65afda5a36ad75a736dfd5d1ca8174ee711ebaa3e3c20ad7dc83261652bad
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
#updating pydrive
!pip install -U -q PyDrive

In [4]:
!apt install openjdk-8-jdk-headless -qq

The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 22 not upgraded.
Need to get 36.6 MB of archives.
After this operation, 143 MB of additional disk space will be used.
Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database ... 123941 files and directories currently installed.)
Preparing to unpack .../openjdk-8-jre-headless_8u342-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u342-b07-0ubuntu1~18.04) ...
Selecting previously unselected package openjdk-8-

The following code setup a directory 

OpenJDK is an open source implementation of the Java Platform, Standard Edition.

In [5]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

***Importing necessary libraries***

In [10]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf



***Creating the session and context***

SparkSession is a class that is part of pyspark.sql package.

It is a wrapper on top of Spark Context.

When Spark application is submitted using spark-submit or spark-shell or pyspark, a web service called as Spark Context will be started.

Spark Context maintains the context of all the jobs that are submitted until it is killed.

SparkSession is nothing but wrapper on top of Spark Context.

We need to first create SparkSession object with any name. But typically we use spark. Once it is created, several APIs will be exposed including read.

We need to at least set Application Name and also specify the execution mode in which Spark Context should run while creating SparkSession object.

We can use appName to specify name for the application and master to specify the execution mode.

Below is the sample code snippet which will start the Spark Session object for us.

**SparkConf()**

Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.

Most of the time, you would create a SparkConf object with SparkConf(), which will load values from spark.* Java system properties as well. In this case, any parameters you set directly on the SparkConf object take priority over system properties.

For unit tests, we can also call SparkConf(false) to skip loading external settings and get the same configuration no matter what the system properties are.

All setter methods in this class support chaining. 


Spark User Interface, which shows application’s dashboard, has the default port of 4040 (link). Property name is **spark.ui.port**

--conf spark.ui.port=4050 is a Spark 1.1 feature


**Port numbers** in computer networking represent communication endpoints. Ports are unsigned 16-bit integers (0-65535) that identify a specific process, or network service. IANA is responsible for internet protocol resources, including the registration of commonly used port numbers for well-known internet services.

Well Known Ports: 0 through 1023.

Registered Ports: 1024 through 49151.

Dynamic/Private : 49152 through 65535.


In [11]:
# Create the session
conf = SparkConf().set('spark.ui.port','4050')

# Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

We can easily check the current version and get the link of the web interface. In the Spark UI, we can monitor the progress of our job and debug the performance bottlenecks(if our colab is running with a local run time)

In [13]:
#to know the version of pyspark
spark

If we are running this colab on the Gooogle hosted runtime, the cell belowwill create a ngrok tunnel which will allow us to still check the Spark

### What is Ngrok?
Ngrok is an application developed by Alan Shreeve, enables developers to expose their local development servers to the internet. It basically creates a tunnel to your local development server and generates two random subdomains on ngrok.com - one http and another with https. With these generated addresses, you can view the locally developed application from anywhere through the Internet provided that the development server is kept running.

Other than just demoing developed applications, ngrok can also be used for :-

* Running a personal service of your own on the cloud.
* Testing out mobile apps that have a local backend server.
* Building and testing out webhooks.

In [14]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

--2022-10-19 11:19:29--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.237.133.81, 54.161.241.46, 52.202.168.65, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.237.133.81|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2022-10-19 11:19:31 (17.4 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
Traceback (most recent call last):
  File "<string>", line 1, in <module>
IndexError: list index out of range


## Data preprocessing

In this colab, rather than downloading a file from Google Drive, we will load a famous machine learning dataset, the **Breast Cancer Wisconsin dataset** using the scikit-learn dataset loader.

### About the dataset

   Features are computed from a digitized image of a fine needle aspirate (FNA) of a breast mass. They describe characteristics of the cell nuclei.

**Attribute Information:**

Ten real-valued features are computed for each cell nucleus (3–32):

a) radius (mean of distances from the center to points on the perimeter)

b) texture (standard deviation of gray-scale values)

c) perimeter

d) area

e) smoothness (local variation in radius lengths)

f) compactness (perimeter² / area — 1.0)

g) concavity (severity of concave portions of the contour)

h) concave points (number of concave portions of the contour)

i) symmetry

j) fractal dimension (“coastline approximation” — 1)

### what is KMean?
   The algorithm we will use to perform segmentation analysis is K-Means clustering. K-Means is a partitioned based algorithm that performs well on medium/large datasets. The algorithm is an unsupervised learning algorithm that utilizes similarities in the data to create groups without labels.



In [15]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

For convenience, given that the dataset is small, we first construct a pandas dataframe, tune the schema, and then convert it into a Spark dataframe.

While creating a PySpark DataFrame we can specify the structure using StructType and StructField classes. As specified in the introduction, StructType is a collection of StructField’s which is used to define the column name, data type, and a flag for nullable or not. Using StructField we can also add nested struct schema, ArrayType for arrays, and MapType for key-value pairs

In [16]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

'''PySpark StructType & StructField classes are used to programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns. StructType is a collection of StructField’s that defines column name, 
column data type, boolean to specify if the field can be nullable or not and metadata.'''

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:  #StructField – Defines the metadata of the DataFrame column
        if struct_field.name in column_list:
            struct_field.nullable = nullable
            '''nullable argument is not a constraint but a reflection of the source and type semantics which enables 
            certain types of optimization we state that we want to avoid null values in your data.'''
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features)) #converting the dataframe to an RDD of tuples

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

With the next cell, we build the two datastructures that we will be using throughout this colab:
* features, a dataframe of Dense vectors, containing all the original features in the dataset;
* labels, a series of binary labels indicating if the corresponding set of features belong to a subject with breast cancer, or not.

### what is data structure?
A data structure is a storage that is used to store and organize data. It is a way of arranging data on a computer so that it can be accessed and updated efficiently.

In [17]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row),['features'])
labels = pd.Series(breast_cancer.target)

## KMeans
PySpark kmeans is a method and function used in the PySpark Machine learning model that is a type of unsupervised learning where the data is without categories or groups. Instead, it groups up the data together and assigns data points to them. This model approach is used for prediction or machine learning analysis of data in the PySpark machine learning model. We have a various model in PySpark that is used to import the data elements and to apply the kmeans algorithm logic to it. Furthermore, the data can be used further for prediction by applying the vector assembler that is a part of the PySpark kmeans data model. Here we will analyze the various method used in kmeans with the data in PySpark.

## Silhoutte score
Evaluator for Clustering results, which expects two input columns: prediction and features. The metric computes the Silhouette measure using the squared Euclidean distance.

The Silhouette is a measure for the validation of the consistency within clusters. It ranges between 1 and -1, where a value close to 1 means that the points in a cluster are close to the other points in the same cluster and far from the points of the other clusters.

In [18]:
#importing necessary libraries for kmean algorithm and evaluating it
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [22]:
# Trains a k-means model
kmeans = KMeans().setK(2).setSeed(1)  #The number 2 is the number of clusters to divide the data into.

'''We see that any number larger than 2 causes this value ClusteringEvaluator() to fall below 0.5, meaning it’s not a clear division. 
Another way to check the optimal number of clusters would be to plot an elbow curve.'''

model = kmeans.fit(features) #To train our model , we use kmeans.fit() here.

In [23]:
# Make predictions
predictions = model.transform(features)  #transforms from one parameter to others dependent on the model

# Evaluate clustering by computing silhoutte score
evaluator = ClusteringEvaluator()

Silhouette score is used to evaluate the quality of clusters created using clustering algorithms such as K-Means in terms of how well samples are clustered with other samples that are similar to each other. The Silhouette score is calculated for each sample of different clusters.

* Silhouette score for a set of sample data points is used to measure how dense and well-separated the clusters are.
* Silhouette score takes into consideration the intra-cluster distance between the sample and other data points within the same cluster (a) and inter-cluster distance between the sample and the next nearest cluster (b).
* The silhouette score falls within the range [-1, 1].
* The silhouette score of 1 means that the clusters are very dense and nicely separated. The score of 0 means that clusters are overlapping. The score of less than 0 means that data belonging to clusters may be wrong/incorrect.


In [24]:
silhouette = evaluator.evaluate(predictions)
print('Silhouette with squared euclidean distance= '+ str(silhouette))


Silhouette with squared euclidean distance= 0.8342904262826145


silhoutte score indicates that clusters are dense and nicely separated since it has value nearer to 1.

# _THANK YOU!_