<a href="https://colab.research.google.com/github/Disco-Gnome/CSC84030_Fall23/blob/main/4_clustering_%26_dimensionality_reduction_RGM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Setup

Let's setup Spark on your Colab environment.  Run the cell below!

In [4]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=6bc4005e99e05816e8df003d1f70d2e1c67efbed8439108dc4c5432c9a5c4263
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indi

Now we import some of the libraries usually needed by our workload.

In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [6]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

### Data Preprocessing

In this Colab, we will load a famous machine learning dataset, the [Breast Cancer Wisconsin dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html), using the ```scikit-learn``` datasets loader.

In [7]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

For convenience, given that the dataset is small, we first construct a Pandas dataframe, tune the schema, and then convert it into a Spark dataframe.

In [8]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

With the next cell, we build the two data structures that we will be using throughout this Colab:


*   ```features```, a dataframe of Dense vectors, containing all the original features in the dataset;
*   ```labels```, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.



In [9]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(breast_cancer.target)

In [10]:
# For debugging!
# features is a dataframe of just the features column, where each value is a dense vector of ...
# TODO: why is this not a list of strings?
print('features meta data: ', features)
print('features: ', features.show())

features meta data:  DataFrame[features: vector]
+--------------------+
|            features|
+--------------------+
|[17.99,10.38,122....|
|[20.57,17.77,132....|
|[19.69,21.25,130....|
|[11.42,20.38,77.5...|
|[20.29,14.34,135....|
|[12.45,15.7,82.57...|
|[18.25,19.98,119....|
|[13.71,20.83,90.2...|
|[13.0,21.82,87.5,...|
|[12.46,24.04,83.9...|
|[16.02,23.24,102....|
|[15.78,17.89,103....|
|[19.17,24.8,132.4...|
|[15.85,23.95,103....|
|[13.73,22.61,93.6...|
|[14.54,27.54,96.7...|
|[14.68,20.13,94.7...|
|[16.13,20.68,108....|
|[19.81,22.15,130....|
|[13.54,14.36,87.4...|
+--------------------+
only showing top 20 rows

features:  None


### Your task

If you run successfully the Setup and Data Preprocessing stages, you are now ready to cluster the data with the [K-means](https://spark.apache.org/docs/latest/ml-clustering.html) algorithm included in MLlib (Spark's Machine Learning library).
Set the ```k``` parameter to **2**, fit the model, and the compute the [Silhouette score](https://en.wikipedia.org/wiki/Silhouette_(clustering)) (i.e., a measure of quality of the obtained clustering).  

**IMPORTANT:** use the MLlib implementation of the Silhouette score (via ```ClusteringEvaluator```).

In [11]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Code referenced from: https://spark.apache.org/docs/latest/ml-clustering.html#output-columns

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(df)

# Make predictions
predictions = model.transform(df)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)

In [12]:
print("labels variable (i.e., the ground truth from our dataset) =\n", labels)
print("Silhouette with squared euclidean distance = ", silhouette)

labels variable (i.e., the ground truth from our dataset) =
 0      0
1      0
2      0
3      0
4      0
      ..
564    0
565    0
566    0
567    0
568    1
Length: 569, dtype: int64
Silhouette with squared euclidean distance =  0.8342904262826145


Take the predictions produced by K-means, and compare them with the ```labels``` variable (i.e., the ground truth from our dataset).  

Compute how many data points in the dataset have been clustered correctly (i.e., positive cases in one cluster, negative cases in the other).

*HINT*: you can use ```np.count_nonzero(series_a == series_b)``` to quickly compute the element-wise comparison of two series.

**IMPORTANT**: K-means is a clustering algorithm, so it will not output a label for each data point, but just a cluster identifier!  As such, label ```0``` does not necessarily match the cluster identifier ```0```.


In [13]:
#Is this comparing values correctly? Is 'prediction' the correct feature to select?
#Could the binary values be flipped? aka is this 83 RIGHT or 83 WRONG?

In [14]:
prediction_series = (predictions.select('prediction').rdd.map(lambda x: x[0]).collect())
np.count_nonzero(labels == prediction_series)

83

Now perform dimensionality reduction on the ```features``` using [SVD](https://spark.apache.org/docs/latest/mllib-dimensionality-reduction), available as well in MLlib.

Reduce the dimensionality to **2**, effectively reducing the dataset size of a **15X** factor. Name the new dataset as ```svdFeatures```

In [15]:
from pyspark.mllib.linalg.distributed import RowMatrix

# Referenced from: https://stackoverflow.com/a/53066752
# After Google search: turn dataframe into a rowmatrix
# features is a DataFrame
featuresRDD = features.rdd.map(list)
featuresRowMatrix = RowMatrix(featuresRDD)
# Referenced from: https://spark.apache.org/docs/latest/mllib-dimensionality-reduction#svd-example
svdFeatures = featuresRowMatrix.computeSVD(2, computeU=True)
print('The U factor is a RowMatrix: \n', svdFeatures.U)
print('The singular values are stored in a local dense vector: \n', svdFeatures.s)
print('The V factor is a local dense matrix:')
print(svdFeatures.V)

The U factor is a RowMatrix: 
 <pyspark.mllib.linalg.distributed.RowMatrix object at 0x7d9f8a125f00>
The singular values are stored in a local dense vector: 
 [30786.444627835797,2480.4457833852953]
The V factor is a local dense matrix:
DenseMatrix([[ 1.07417853e-02,  3.10857421e-02],
             [ 1.34045777e-02,  4.83124253e-02],
             [ 7.04506088e-02,  1.97364828e-01],
             [ 5.72522445e-01,  7.70224130e-01],
             [ 6.51751678e-05,  2.62097172e-04],
             [ 8.01017182e-05,  1.75341873e-04],
             [ 8.07639302e-05,  4.07661495e-05],
             [ 4.51934799e-05,  1.69018436e-05],
             [ 1.22298430e-04,  4.97684199e-04],
             [ 4.10463505e-05,  1.84499403e-04],
             [ 3.52395083e-04,  8.41315666e-05],
             [ 7.91756415e-04,  4.03047670e-03],
             [ 2.49763979e-03,  1.33910533e-03],
             [ 4.15710660e-02, -6.79661807e-02],
             [ 4.46726431e-06,  2.42657052e-05],
             [ 1.84034785e-0

In [16]:
# # Convert svd.U to dataframe
# svdFeatUrows = svdFeatures.U.rows.map(lambda x: (x[0],x[1])).collect()
# reduced_df = spark.createDataFrame(pd.DataFrame(svdFeatUrows))
# # Set nullable to prevent error in kMeans
# # https://stackoverflow.com/questions/55162989/pyspark-kmeans-clustering-features-column-illegalargumentexception
# reduced_df = set_df_columns_nullable(spark, reduced_df, reduced_df.columns)

Now run K-means with the same parameters as above, but on the ```svdFeatures``` produced by the SVD reduction you just executed.

Compute the Silhouette score, as well as the number of data points that have been clustered correctly.

In [100]:
from pyspark.sql import dataframe
from pyspark.mllib.linalg import Matrix, Matrices, DenseMatrix
# Matrix Multiply U by s: result should be either a RowMatrix or a dataframe
# svdUsProductRowMatrix or svdUsProductDf
# if RowMatrix:
#  svdUsProductDf = spark.createDataFrame(pd.DataFrame(svdUsProductRowMatrix))
u_factor = svdFeatures.U

# convert svd.s[DenseVector] into a numpy.Array
# https://api-docs.databricks.com/python/pyspark/latest/api/pyspark.mllib.linalg.DenseVector.html?highlight=densevector#pyspark.mllib.linalg.DenseVector.toArray
s_factor_numpyArray = svdFeatures.s.toArray()

# convert numpy.Array into a DenseMatrix
s_factor_DenseMatrix = DenseMatrix(2, 2, [svdFeatures.s[0], 0, 0, svdFeatures.s[1]])

# Multiply U by s
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.linalg.distributed.RowMatrix.html#pyspark.mllib.linalg.distributed.RowMatrix.multiply
reduced_matrix = u_factor.multiply(s_factor_DenseMatrix)

# Convert to pandas df & then spark df
reduced_df = spark.createDataFrame(pd.DataFrame(reduced_matrix.rows.collect()))

print("U: ", u_factor.rows.collect(), "\n")
print("Σ: ", s_factor_DenseMatrix, "\n")
print("Final reduced data: ", reduced_df.collect())


U:  [DenseVector([0.0728, -0.1402]), DenseVector([0.0771, -0.0229]), DenseVector([0.0683, -0.0048]), DenseVector([0.0227, 0.0009]), DenseVector([0.0665, 0.0555]), DenseVector([0.0289, -0.0108]), DenseVector([0.0625, -0.0312]), DenseVector([0.035, -0.0149]), DenseVector([0.0297, 0.004]), DenseVector([0.0281, -0.0037]), DenseVector([0.0458, -0.0031]), DenseVector([0.0495, -0.0427]), DenseVector([0.0569, 0.0576]), DenseVector([0.0383, 0.0551]), DenseVector([0.0297, 0.0326]), DenseVector([0.0377, 0.0017]), DenseVector([0.0434, -0.0364]), DenseVector([0.0502, -0.0403]), DenseVector([0.0876, -0.1454]), DenseVector([0.0298, 0.0241]), DenseVector([0.0268, 0.0284]), DenseVector([0.0137, 0.0211]), DenseVector([0.0396, 0.0066]), DenseVector([0.096, -0.1498]), DenseVector([0.0761, -0.2152]), DenseVector([0.0563, -0.0392]), DenseVector([0.0362, 0.0081]), DenseVector([0.0581, 0.031]), DenseVector([0.0478, -0.0495]), DenseVector([0.0508, 0.0285]), DenseVector([0.0651, -0.0314]), DenseVector([0.0322, 

In [78]:
# Set columns nullable so that rows with invalid data (value == 0) don't create error in kMeans calculation
# https://stackoverflow.com/questions/55162989/pyspark-kmeans-clustering-features-column-illegalargumentexception
reduced_df = set_df_columns_nullable(spark, reduced_df, reduced_df.columns)

# Append reduced features list to reduced df
#  KMeans requires features array as column
reduced_df = reduced_df.withColumn('features', array(reduced_df.columns))

In [89]:
##### DISCARDED CODE #####
# Converting the SVD DenseMatrix to a DataFrame bc .fit() needs DF https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.KMeans.html#pyspark.ml.clustering.KMeans.fit
# svdFeaturesDenseMatrix = svdFeatures.V
# https://stackoverflow.com/a/54106020
# denseMatrixToDataFrame = spark.createDataFrame(
#     svdFeaturesDenseMatrix.toArray().tolist()
# )
##########################


# Code referenced from: https://spark.apache.org/docs/latest/ml-clustering.html#output-columns

# Trains a k-means model.
svd_reduced_kmeans = KMeans().setK(2).setSeed(1)
svd_reduced_model = svd_reduced_kmeans.fit(reduced_df)

# Make predictions
reduced_predictions = svd_reduced_model.transform(reduced_df)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

reduced_silhouette = evaluator.evaluate(reduced_predictions)

In [90]:
print("Reduced data Silhouette with squared euclidean distance = ", reduced_silhouette)
print("Original data Silhouette with squared euclidean distance = ", silhouette)

Reduced data Silhouette with squared euclidean distance =  0.8351779819006669
Original data Silhouette with squared euclidean distance =  0.8342904262826145


In [94]:
reduced_prediction_series = (reduced_predictions.select('prediction').rdd.map(lambda x: x[0]).collect())
print("Reduced data comparison: ", np.count_nonzero(labels == reduced_prediction_series))
print("Original data comparison: ", np.count_nonzero(labels == prediction_series))

Reduced data comparison:  83
Original data comparison:  83


#### **Submission Intruction:**

#### Click File -> Download -> Download .ipynb, and upload the downloaded file to Blackboard.