# <span style="color:#fa04d9"><center>K-MEANS CLUSTERING USING APACHE SPARK</center></span>

### There are several machine learning clustering examples available online using various implementations such as scikit-learn or other packages. Below is one such example describing how to use a k-means algorithm on randomly generated two dimensional data with the Apache Spark implementation. 

### After covering the first basic example with generic data, we will subsequently look at a second example on more realistic customer data.

# <span style="color:#fa04d9"><center>PART 1: GENERIC EXAMPLE</center></span>

# <span style="color:#fa04d9">**Step 1: Import and declare a few variables which will be used in the subsequent cells**</span>

In [None]:
from __future__ import print_function

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext

%matplotlib inline

print (pyspark.__version__)

# <span style="color:#fa04d9">**Step 2: Create the generic data set using the pandas make_blobs method**</span>

### The input parameters in the cell below have the following meanings:
1- num_samples: Total numbers of datapoints which will be generated.<br>
2- num_features: Number of dimensions associated with each datapoint. For example, num_features = 2 means that each point has two coordinates x and y

The **make_blobs** method will take one more parameter indicating the number of clusters which we would like to use. The total number of datapoints to be generated will be distributed around the center point of those clusters.

In [None]:
#n_samples=10000
#n_features=3
num_samples = 3000 #Total number of points
num_features = 2 #2D datapoints
num_clusters = 6
X, y = make_blobs(n_samples=num_samples, centers=num_clusters, n_features=num_features, random_state=42)

### The make_blobs method returns two arrays, which we will label X and y:
1- X is the array of all num_sample datapoints which were generated.<br>
2- y is a corresponding array providing the cluster which the datapoint at the same offset in x belongs to. For example, the first entry in x is a datapoint coordinates and the corresponding first entry in y will be the cluster that this datapoint belongs to

### We can take a quick look at those two arrays X and y to get a feel for their content.

In [None]:
print(X)

In [None]:
print (y)

### In the next few steps, we are going to convert those two arrays X and y to a dataframe which will allow us to later bring the data into a machine learning algorithm

1- Createa a dataframe named df with two columns 'x' and 'y' which will contain the coordinates of the points from the original array.<br>
2- Add to the dataframe a third column named 'id'. This column will have the keyword 'row' augmented with the index of the current row, thereby uniquely identify each row

In [None]:
# add a row index as a string
dfpandas = pd.DataFrame(X, columns=['x', 'y'])
dfpandas['id'] = 'row'+dfpandas.index.astype(str)

In [None]:
dfpandas.head()

### Now move the id column to the front (left) of the dataframe.

In [None]:
cols = list(dfpandas)

In [None]:
cols

In [None]:
cols = list(dfpandas)
cols.insert(0, cols.pop(cols.index('id')))
dfpandas = dfpandas.ix[:, cols]
dfpandas.head()

In [None]:
#If desired, we can save the data as a local csv file and reload it later. In the current example, we will directly convert the pandas dataframe to a Spark one.
# save the ndarray as a csv file
#df.to_csv('input.csv', index=False)
#!cat input.csv

In [None]:
myplot = plt.figure(figsize=(12,10)).gca(projection='rectilinear')
myplot.scatter(X[:,0], X[:,1], c=y)
myplot.set_xlabel('x')
myplot.set_ylabel('y')
plt.show()

In [None]:
FEATURES_COL = ['x', 'y']
path = 'input.csv'

# <span style="color:#fa04d9">**Step 3: Create a Spark dataframe and cast x and y values to float**</span>

In [None]:
dfspark = spark.createDataFrame(dfpandas)

In [None]:
#dfspark = spark.read.csv(path, header=True) # requires spark 2.0. If the data was saved as a local csv file, we'd read it back this way instead of the cell above.
dfspark.show()

### Casting to float can be done in a couple of different ways

#### We can scan all the "features" columns after the first one (which is the id column which is a string) and convert them to float

In [None]:
dfspark_feat = dfspark.select(*(dfspark[c].cast("float").alias(c) for c in dfspark.columns[1:]))
dfspark_feat.show()

#### Alternatively, since we know the names of the feature columns, we can simply target them with the type conversion. We will keep the result of this approach going forward

In [None]:
for col in dfspark.columns:
    if col in FEATURES_COL:
        dfspark = dfspark.withColumn(col,dfspark[col].cast('float'))
dfspark.show()

#### Drop any potential null values

In [None]:
dfspark = dfspark.na.drop()
dfspark.show()

## The Spark implementation of many machine learning algorithms requires that all input columns be "concatenated" into a single input vector. This is done with a Vector Assembler transformer

<a id="vectorassembler"></a>
## <span style="color:green">Getting familiar with the SparkML Transformer: <a href="https://spark.apache.org/docs/latest/ml-features.html#vectorassembler">VectorAssembler</a> </span>

### VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees.

<div class="panel-group" id="accordion-1">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-1" href="#collapse1-1">
        Click on this link to expand this cell, then copy and paste the code which will appear in a new cell just below, and execute that cell to see how VectorAssembler works. (You may subsequently delete that new cell and proceed with this notebook).</a>
      </h4>
    </div>
    <div id="collapse1-1" class="panel-collapse collapse">
      <div class="panel-body">
from pyspark.ml.linalg import Vectors <br>
from pyspark.ml.feature import VectorAssembler <br>
<br>
dataset = spark.createDataFrame( <br>
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)], <br>
    ["id", "hour", "mobile", "userFeatures", "clicked"]) <br>
<br>
assembler = VectorAssembler( <br>
    inputCols=["hour", "mobile", "userFeatures"], <br>
    outputCol="features") <br>
<br>
output = assembler.transform(dataset) <br>
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'") <br>
output.select("features", "clicked").show(truncate=False) <br>
      </div>
    </div>
  </div>

In [None]:
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
df_kmeans = vecAssembler.transform(dfspark).select('id', 'features')
df_kmeans.show()

# <span style="color:#fa04d9">**Step 4: Decide the number of clusters 'k'**</span>

### One of the "downsides" of the k-means clustering algorithm is that it is not able to choose on its own the number of clusters, which has to be provided as input.

### One way of determining the best number of clusters is to try different values and determine which one yields the lowest cost

In [None]:
cost = np.zeros(20)
for k in range(2,20):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df_kmeans.sample(False,0.1, seed=42))
    cost[k] = model.computeCost(df_kmeans) # requires Spark 2.0 or later

### Find the "elbow" in the cost curve

In [None]:
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,20),cost[2:20])
ax.set_xlabel('k')
ax.set_ylabel('cost')

### It seems from the curve above that the ideal value of k is around 6.

# <span style="color:#fa04d9">**Step 5: Run k-means with 6 clusters**</span>

In [None]:
k = 6
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(df_kmeans)
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
transformed = model.transform(df_kmeans).select('id', 'prediction')
transformed.show(5)

# <span style="color:#fa04d9">**Step 6: Join the predictions results with the original dataframe**</span>

In [None]:
df_pred = transformed.join(dfspark, 'id')
df_pred.show(5)

In [None]:
df_pred_pandas = df_pred.toPandas().set_index('id')
df_pred_pandas.head()

In [None]:
myplot = plt.figure(figsize=(12,10)).gca(projection='rectilinear')
myplot.scatter(df_pred_pandas.x, df_pred_pandas.y, c=df_pred_pandas.prediction)
myplot.set_xlabel('x')
myplot.set_ylabel('y')
#threedee.set_zlabel('z')
plt.show()

In [None]:
myplot = plt.figure(figsize=(12,10)).gca(projection='rectilinear')
myplot.scatter(X[:,0], X[:,1], c=y)
myplot.set_xlabel('x')
myplot.set_ylabel('y')
plt.show()

# <span style="color:#fa04d9"><center>**PART 2: Run k-means with a customers dataset**</center></span>

# <span style="color:#fa04d9">**Step 7: Download the customer data**</span>

### The cell below is used when the data file is loaded locally into the Watson Studio object storage. Not used in this scenario

In [None]:
#import ibmos2spark

# @hidden_cell
#credentials = {
#    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
#    'api_key': 'ymzV2WGV_YuRqjW-ysK1LqbbCrwKmhRmCtF8BFPP1aMz',
#    'service_id': 'iam-ServiceId-b15106ac-8f38-4585-acbc-dd19c7d847c9',
#    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token'}

#configuration_name = 'os_a36dabc7e3cb4c1b9971724a79e7f4ee_configs'
#cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

#from pyspark.sql import SparkSession
#spark = SparkSession.builder.getOrCreate()
#customers = spark.read\
#  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
#  .option('header', 'true')\
#  .option('inferschema', 'true')\
#  .load(cos.url('CustomerDataSegmentation.csv', 'm32-donotdelete-pr-6sjzl2grahnlag'))
#customers.take(5)


In [None]:
#Run once to install the wget package
!pip install wget

In [None]:
import wget
url_customer='https://raw.githubusercontent.com/DScienceAtScale/K-means/master/data/CustomerDataSegmentation.csv'

In [None]:
!rm -f CustomerDataSegmentation.csv

customerFilename=wget.download(url_customer)

#list existing files
!ls -l CustomerDataSegmentation.csv

In [None]:
customers = spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header", "true").option("inferSchema", "true").load(customerFilename)

In [None]:
customers.show(5)

# <span style="color:#fa04d9">**Step 8: Some data preparation**</span>

### We are going to put the AGE and INCOME columns into 3 distinct buckets (bucket values will be 0, 1, 2). This is referred to as binning or bucketizing. We can later rerun the clustering with different numbers of buckets.

In [None]:
from pyspark.ml.feature import QuantileDiscretizer

In [None]:
from pyspark.sql.types import DoubleType
customers=customers.withColumn("AGE", customers["AGE"].cast(DoubleType()))
customers=customers.withColumn("INCOME", customers["INCOME"].cast(DoubleType()))

In [None]:
customers = QuantileDiscretizer(numBuckets=3, inputCol="AGE",outputCol="AGE_BINS").fit(customers).transform(customers)
customers = QuantileDiscretizer(numBuckets=3, inputCol="INCOME",outputCol="INCOME_BINS").fit(customers).transform(customers)

### We define a function to correct some cases where the marital status is undefined. We will (arbitrarily) decide that if the household has more than 2 members then the status is 'Married' otherwise 'Single'

In [None]:
def convertms(MARITAL_STATUS, MEMBERS_IN_HOUSEHOLD):
    if  (MARITAL_STATUS == "S") or (MARITAL_STATUS == "M"): 
        MARITAL_STATUS
    elif (MARITAL_STATUS == "U") and (MEMBERS_IN_HOUSEHOLD >= 2):  
        MARITAL_STATUS = "M"
    else:
        MARITAL_STATUS = "S"

    return MARITAL_STATUS

In [None]:
from pyspark.sql.functions import udf
convertmsudf = udf(convertms)

### The dataframe customers2 has a derived marital status field, where we attempted to correct undefined marital statuses.

In [None]:
customers2 = customers.withColumn("MARITAL_STATUS_DERIVED", convertmsudf(customers["MARITAL_STATUS"], customers['MEMBERS_IN_HOUSEHOLD']))

### Apply String Indexers to some string columns which we want to use as input to the clustering logic

In [None]:
from pyspark.ml.feature import StringIndexer

indexer_ms = StringIndexer(inputCol="MARITAL_STATUS_DERIVED", outputCol="MARITAL_STATUS_Indexed").fit(customers2)
customers2 = indexer_ms.transform(customers2) 
customers2.select(['CUST_ID', 'NAME', 'AGE', 'GENDER', 'MARITAL_STATUS_DERIVED', 'EDUCATION', 'MARITAL_STATUS_Indexed']).show()

<div class="panel-group" id="accordion-2">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-2" href="#collapse1-2">
        Repeat the string indexer transformation for the GENDER column. Click on this hint to copy / paste the answer if needed.</a>
      </h4>
    </div>
    <div id="collapse1-2" class="panel-collapse collapse">
      <div class="panel-body">
indexer_gender = StringIndexer(inputCol="GENDER", outputCol="GENDER_Indexed").fit(customers2) <br>
customers2 = indexer_gender.transform(customers2) <br>
customers2.select(['CUST_ID', 'NAME', 'AGE', 'GENDER', 'MARITAL_STATUS_DERIVED', 'EDUCATION', 'GENDER_Indexed']).show()<br>
      </div>
    </div>
  </div>

#### <span style="color:red">Warning, the blank cell above is missing required code. You need to either add the string indexer transformation for the GENDER column as per previous examples, or use the hint in the cell just above the previous one to add the missing code before proceeding with the rest of the notebook</span>

<div class="panel-group" id="accordion-3">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-3" href="#collapse1-3">
        Repeat the string indexer transformation for the EDUCATION column. Click on this hint to copy / paste the answer if needed.</a>
      </h4>
    </div>
    <div id="collapse1-3" class="panel-collapse collapse">
      <div class="panel-body">
indexer_education = StringIndexer(inputCol="EDUCATION", outputCol="EDUCATION_Indexed").fit(customers2) <br>
customers2 = indexer_education.transform(customers2) <br>
customers2.select(['CUST_ID', 'NAME', 'AGE', 'GENDER', 'MARITAL_STATUS_DERIVED', 'EDUCATION', 'EDUCATION_Indexed']).show()<br>
      </div>
    </div>
  </div>

#### <span style="color:red">Warning, the blank cell above is missing required code. You need to either add the string indexer transformation for the EDUCATION column as per previous examples, or use the hint in the cell just above the previous one to add the missing code before proceeding with the rest of the notebook</span>

<div class="panel-group" id="accordion-4">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-4" href="#collapse1-4">
        Repeat the string indexer transformation for the PROFESSION column. Click on this hint to copy / paste the answer if needed.</a>
      </h4>
    </div>
    <div id="collapse1-4" class="panel-collapse collapse">
      <div class="panel-body">
indexer_profession = StringIndexer(inputCol="PROFESSION", outputCol="PROFESSION_Indexed").fit(customers2)<br>
customers2 = indexer_profession.transform(customers2)<br>
customers2.select(['CUST_ID', 'NAME', 'AGE', 'GENDER', 'MARITAL_STATUS_DERIVED', 'EDUCATION', 'PROFESSION', 'PROFESSION_Indexed']).show()<br>
      </div>
    </div>
  </div>

#### <span style="color:red">Warning, the blank cell above is missing required code. You need to either add the string indexer transformation for the PROFESSION column as per previous examples, or use the hint in the cell just above the previous one to add the missing code before proceeding with the rest of the notebook</span>

In [None]:
#customers2=customers2.withColumn("agetile", customers2["agetile"].cast(DoubleType()))
#customers2=customers2.withColumn("incometile", customers2["incometile"].cast(DoubleType()))
#customers2=customers2.withColumn("GENDER_Indexed", customers2["GENDER_Indexed"].cast(DoubleType()))
#customers2=customers2.withColumn("MARITAL_STATUS_Index", customers2["MARITAL_STATUS_Index"].cast(DoubleType()))
#customers2=customers2.withColumn("EDUCATION_Indexed", customers2["EDUCATION_Indexed"].cast(DoubleType()))
#customers2=customers2.withColumn("PROFESSION_Indexed", customers2["PROFESSION_Indexed"].cast(DoubleType()))

In [None]:
FEATURES_COL = ['AGE_BINS','INCOME_BINS','GENDER_Indexed', 'MARITAL_STATUS_Indexed', 'EDUCATION_Indexed']

### <span style="color:blue">Mini data frame manipulation exercise: Can you check how many rows were switched from a married status of 'U' to either 'M' or 'S' ?</span>

<div class="panel-group" id="accordion-5">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-5" href="#collapse1-5">
        Check how many rows had the married status of 'U' changed to 'M' or 'S'. Click on this hint to see one possible way to query the answer. You can copy / paste the answer in the empty cell below and execute it</a>
      </h4>
    </div>
    <div id="collapse1-5" class="panel-collapse collapse">
      <div class="panel-body">
customers2.filter(customers2["MARITAL_STATUS"]!=customers2["MARITAL_STATUS_DERIVED"]).count()<br>
      </div>
    </div>
  </div>

### <span style="color:blue">Mini data frame manipulation exercise: How many customers are left with the 'U' married state

<div class="panel-group" id="accordion-6">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-6" href="#collapse1-6">
        Check how many rows are left with the 'U' married state. Click on this hint to see one possible way to query the answer. You can copy / paste the answer in the empty cell below and execute it</a>
      </h4>
    </div>
    <div id="collapse1-6" class="panel-collapse collapse">
      <div class="panel-body">
customers2.filter(customers2["MARITAL_STATUS_DERIVED"]=='U').count()<br>
      </div>
    </div>
  </div>

# <span style="color:#fa04d9">**Step 9: Build the vectorized and call the k-means algorithm**</span>

### Build a df_kmeans dataframe that will have the vector of features ready to be ingested by the Spark k-means algorithm

<div class="panel-group" id="accordion-7">
  <div class="panel panel-default">
    <div class="panel-heading">
      <h4 class="panel-title">
        <a data-toggle="collapse" data-parent="#accordion-7" href="#collapse1-7">
        Build a vector of features that will be passed into the k-means algorithm. Click on this hint to see a code example that can be used, which you can copy / paste into a blank cell below.</a>
      </h4>
    </div>
    <div id="collapse1-7" class="panel-collapse collapse">
      <div class="panel-body">
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")<br>
df_kmeans = vecAssembler.transform(customers2).select('CUST_ID','features')<br>
df_kmeans.show()<br>
      </div>
    </div>
  </div>

#### <span style="color:red">Warning, the blank cell above is missing required code. You need to either add the vector assembler transformation to produce the df_kmeans dataframe as per previous examples, or use the hint in the cell just above the previous one to add the missing code before proceeding with the rest of the notebook</span>

### Investigate the optimal number of clusters...

#### <span style="color:red">Warning, the couple of cells below need to fill an actual value for the token xxxfill_valuexxx . Based on the generic example covered in the first half of this notebook, proceed with some trials to identify a value for the number of clusters to use in the rest of this lab...</span>

In [None]:
cost = np.zeros(xxxfill_valuexxx)
for k in range(2,xxxfill_valuexxx):
    kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
    model = kmeans.fit(df_kmeans.sample(False,0.2, seed=42))
    cost[k] = model.computeCost(df_kmeans) # requires Spark 2.0 or later

In [None]:
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,xxxfill_valuexxx),cost[2:xxxfill_valuexxx])
ax.set_xlabel('k')
ax.set_ylabel('cost')

#### <span style="color:red">Warning, the cell below needs a value for k before being run. Please replace the token xxxfill_valuexxx with an actual number based on findings from running the previous two cells. </span>

In [None]:
k = xxxfill_valuexxx
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(df_kmeans)
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)

### Transform the df_kmeans dataframe to assign a prediction for each data point (i.e for each customer)

In [None]:
transformed = model.transform(df_kmeans).select('CUST_ID', 'prediction')
rows = transformed.collect()
print(rows[:3])

### Similar to what was previously done, we will now join the predictions with the original rows through the CUST_ID field

In [None]:
df_pred = transformed.join(customers2, 'CUST_ID')
df_pred.select(['CUST_ID', 'NAME', 'AGE', 'GENDER', 'MARITAL_STATUS', 'EDUCATION', 'INCOME_BINS', 'prediction']).show()
#df_pred.select(['CUST_ID', 'customers2.NAME', 'customers2.AGE', 'customers2.GENDER', 'customers2.MARITAL_STATUS', 'customers2.EDUCATION', 'prediction']).show()

In [None]:
cluster_number = 2

### Pick one cluster and check how many customers it contains

In [None]:
df_pred.filter(df_pred["prediction"]==cluster_number).select('CUST_ID', 'NAME', 'AGE', 'GENDER', 'MARITAL_STATUS_DERIVED', 'MEMBERS_IN_HOUSEHOLD', 'EDUCATION', 'AGE_BINS', 'INCOME_BINS', 'prediction').count()

### We can also look at the customers in that same cluster

In [None]:
df_pred.filter(df_pred["prediction"]==cluster_number).select('CUST_ID', 'NAME', 'AGE', 'GENDER', 'MARITAL_STATUS_DERIVED', 'MEMBERS_IN_HOUSEHOLD', 'EDUCATION', 'AGE_BINS', 'INCOME_BINS', 'prediction').show(1000)

### <span style="color:blue">Exercise: By varying the cluster_number variable above, can you start analyzing the content of each cluster and the type of customers that it contains? Do the clusters look homogeneous to you? If not, what should be your next steps? </span>

### We can also take a look at the cluster attributes visually...

### Convert the predictions dataframe to Pandas.

In [None]:
df_pred_pandas = df_pred.select('prediction', 'AGE_BINS', 'INCOME_BINS', 'GENDER_Indexed', 'MARITAL_STATUS_Indexed', 'EDUCATION_Indexed').filter(df_pred.prediction ==1).toPandas()

In [None]:
df_pred_pandas.head()

### Use parallel coordinates to visualize the cluster attributes. Note, it is possible to show more than one cluster in the same graph by modifying the 'prediction' predicate in the df_pred_pandas dataframe definition a couple of cells above.

In [None]:
from pandas.tools.plotting import parallel_coordinates
plt.figure(figsize=(12,5))
pc = parallel_coordinates(df_pred_pandas, 'prediction', color=('red', 'green', 'blue', 'yellow', 'purple'))

### Print the values of the various Indexed attributes to recall what they correspond to

In [None]:
print("marital status:",indexer_ms.labels, "gender:", indexer_gender.labels, "education:", indexer_education.labels)

### <span style="color:blue">Exercise: Consider rerunning the clustering with a different number of buckets for the AGE and the INCOME columns? </span>

## Congratulations, you have reached the end of this notebook, but this is only the beginning of the process, there is always a lot more to do !!

**For questions or feedback, please contact:<br>
Mokhtar Kandil.<br>
mkandil@ca.ibm.com**<br>
IBM DTE (Digital Technical Engagements) Big Data and Data Science<br>
August 2018.