<a href="https://colab.research.google.com/github/Investigator13th/CS246/blob/main/CS246_Colab_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CS246 - Colab 3
## K-Means & PCA

### Setup

Let's set up Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a38b65a13a294ced46c2c536106d382f837ab6b58cb63a9d88f0094fed7561f9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

Now we import some of the libraries usually needed by our workload.





In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [3]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

### Data Preprocessing

In this Colab, rather than downloading a file from Google Drive, we will load a famous machine learning dataset, the [Breast Cancer Wisconsin dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html), using the ```scikit-learn``` datasets loader.

In [4]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

For convenience, given that the dataset is small, we first

*   construct a Pandas dataframe
*   tune the schema
*   and convert it into a Spark dataframe.

In [5]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))

df.printSchema()

root
 |-- mean radius: double (nullable = false)
 |-- mean texture: double (nullable = false)
 |-- mean perimeter: double (nullable = false)
 |-- mean area: double (nullable = false)
 |-- mean smoothness: double (nullable = false)
 |-- mean compactness: double (nullable = false)
 |-- mean concavity: double (nullable = false)
 |-- mean concave points: double (nullable = false)
 |-- mean symmetry: double (nullable = false)
 |-- mean fractal dimension: double (nullable = false)
 |-- radius error: double (nullable = false)
 |-- texture error: double (nullable = false)
 |-- perimeter error: double (nullable = false)
 |-- area error: double (nullable = false)
 |-- smoothness error: double (nullable = false)
 |-- compactness error: double (nullable = false)
 |-- concavity error: double (nullable = false)
 |-- concave points error: double (nullable = false)
 |-- symmetry error: double (nullable = false)
 |-- fractal dimension error: double (nullable = false)
 |-- worst radius: double (nullable

With the next cell, we build the two data structures that we will be using throughout this Colab:


*   ```features```, a dataframe of Dense vectors, containing all the original features in the dataset;
*   ```labels```, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.



In [6]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(breast_cancer.target)

In [37]:
features

DataFrame[features: vector]

### Your task

If you run successfully the Setup and Data Preprocessing stages, you are now ready to cluster the data with the [K-means](https://spark.apache.org/docs/latest/ml-clustering.html) algorithm included in MLlib (Spark's Machine Learning library).
Set the ```k``` parameter to **2** and seed to **1**, fit the model, and the compute the [Silhouette score](https://en.wikipedia.org/wiki/Silhouette_(clustering)) (i.e., a measure of quality of the obtained clustering, here we use squared euclidean distance).  

**IMPORTANT:** use the MLlib implementation of the Silhouette score (via ```ClusteringEvaluator```).

In [10]:
''' 8-9 lines of code in total expected but can differ based on your style.
for sub-parts of the question, creating different cells of code would be recommended.
The running time should be less than 1 minute'''
# YOUR CODE HERE
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

k = 2
seed = 1

kmeans = KMeans(k=k, seed=seed)

model = kmeans.fit(features)

predictions = model.transform(features)

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions.select('features', 'prediction'))

print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.8342904262826145


Take the predictions produced by K-means, and compare them with the ```labels``` variable (i.e., the ground truth from our dataset).  

Compute how many data points in the dataset have been clustered correctly (i.e., positive cases in one cluster, negative cases in the other), please use the best case scenario since the output cluster ids can be a permutation of labels.

*HINT*: you can use ```np.count_nonzero(series_a == series_b)``` to quickly compute the element-wise comparison of two series.

**IMPORTANT**: K-means is a clustering algorithm, so it will not output a label for each data point, but just a cluster identifier!  As such, label ```0``` does not necessarily match the cluster identifier ```0```.


In [22]:
''' 4 lines of code in total expected but can differ based on your style. for sub-parts of the question, creating different cells of code would be recommended.'''
# YOUR CODE HERE
from pyspark.sql.functions import pandas_udf, PandasUDFType

labels_spark_df = spark.createDataFrame(pd.DataFrame({'label': labels}))

predictions_with_index = predictions.withColumn("temp_index", monotonically_increasing_id())
labels_with_index = labels_spark_df.withColumn("temp_index", monotonically_increasing_id())

joined_data = predictions_with_index.join(labels_with_index, on="temp_index")

joined_data = joined_data.drop("temp_index")

# 使用 Pandas UDF 来计算每个聚类中最多的标签
@pandas_udf('int', PandasUDFType.GROUPED_AGG)
def most_common_label(labels):
    return labels.mode().iloc[0]

# 计算每个聚类中最多的标签
mapping = joined_data.groupBy('prediction').agg(most_common_label(col('label')).alias('mapped_label'))
mapping.take(2)

[Row(prediction=0, mapped_label=1), Row(prediction=1, mapped_label=0)]

In [23]:
# 将 mapping 加载到 joined_data 中
joined_data = joined_data.join(mapping, on='prediction')

# 计算正确分类的数量
correctly_clustered = joined_data.filter(joined_data.label == joined_data.mapped_label).count()

# 输出结果
print(f"Number of correctly clustered data points: {correctly_clustered}")

Number of correctly clustered data points: 486


In [26]:
correctly_clustered/joined_data.count()

0.8541300527240774

Now perform dimensionality reduction on the ```features``` using the [PCA](https://spark.apache.org/docs/latest/ml-features.html#pca) statistical procedure, available as well in MLlib.

Set the ```k``` parameter to **2**, effectively reducing the dataset size of a **15X** factor.

In [30]:
''' 6 lines of code in total expected but can differ based on your style. for sub-parts of the question, creating different cells of code would be recommended.
The running time should be less than 30 seconds. Sanity check: the fourth row in the result should be [-692.6905100570509,38.57692259208171]'''
# YOUR CODE HERE
from pyspark.ml.feature import PCA

# 创建 PCA 模型实例，设置 k=2
pca = PCA(k=2, inputCol='features', outputCol='pca_features')

# 拟合 PCA 模型
model = pca.fit(features)

# 使用 PCA 模型转换数据
pca_results = model.transform(features)

# 打印前几行结果
pca_results.select('pca_features').show(5, truncate=False)

+----------------------------------------+
|pca_features                            |
+----------------------------------------+
|[-2260.013886292542,-187.96030122263687]|
|[-2368.9937557820544,121.58742425815537]|
|[-2095.6652015478608,145.11398565870124]|
|[-692.6905100570509,38.57692259208171]  |
|[-2030.2124927427067,295.2979839927931] |
+----------------------------------------+
only showing top 5 rows



In [56]:
pca_features = pca_results.select('pca_features')

In [57]:
pca_features

DataFrame[pca_features: vector]

Now run K-means with the same parameters as above, but on the ```pcaFeatures``` produced by the PCA reduction you just executed.

Compute the Silhouette score, as well as the number of data points that have been clustered correctly (also the best case scenario).

In [67]:
''' 11-13 lines of code in total expected but can differ based on your style. for sub-parts of the question, creating different cells of code would be recommended.'''
# YOUR CODE HERE
# 重新运行KMeans聚类
k = 2
seed = 1
kmeans_pca = KMeans(k=k, seed=seed).setFeaturesCol('pca_features')
model = kmeans_pca.fit(pca_features)

# 对数据进行预测
predictions_pca = model.transform(pca_features)

In [68]:
# 计算轮廓系数
evaluator = ClusteringEvaluator().setFeaturesCol('pca_features')
silhouette = evaluator.evaluate(predictions_pca.select('pca_features', 'prediction'))

print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.8348610363444832


In [69]:
predictions_with_index = predictions_pca.withColumn("temp_index", monotonically_increasing_id())
labels_with_index = labels_spark_df.withColumn("temp_index", monotonically_increasing_id())

joined_data = predictions_with_index.join(labels_with_index, on="temp_index")

joined_data = joined_data.drop("temp_index")

# 使用 Pandas UDF 来计算每个聚类中最多的标签
@pandas_udf('int', PandasUDFType.GROUPED_AGG)
def most_common_label(labels):
    return labels.mode().iloc[0]

# 计算每个聚类中最多的标签
mapping = joined_data.groupBy('prediction').agg(most_common_label(col('label')).alias('mapped_label'))
mapping.take(2)



[Row(prediction=0, mapped_label=1), Row(prediction=1, mapped_label=0)]

In [70]:
# 将 mapping 加载到 joined_data 中
joined_data = joined_data.join(mapping, on='prediction')

# 计算正确分类的数量
correctly_clustered = joined_data.filter(joined_data.label == joined_data.mapped_label).count()

# 输出结果
print(f"Number of correctly clustered data points: {correctly_clustered}")

Number of correctly clustered data points: 486


Once you obtained the desired results, **head over to Gradescope and submit your solution for this Colab**!