In [1]:
from pyspark.mllib.linalg import Matrices
import numpy as np
import matplotlib.pyplot as plt

np.random.seed(2017)

In this notebook, we'll implement the distributed version of the Affinity Propagation Clustering Algorithm.

Let's begin by testing Spark and checking everything is ok before proceeding... This example will be deleted later.
An interesting link is the following: http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

Let's begin be implementing a word count example using Spark and mimicking Map/Reduce paradigm. We'll adapt this code to our problem later.

In [None]:
# SparkContext is automatically created in the object named "sc"

# Read the text file
text_file = sc.textFile("Les_Miserables.txt")

# Split the text into words
tokenized = text_file.flatMap(lambda line: line.split(" "))

# Mapper part: Associate the weight 1 to each word
wordCountsMapper = tokenized.map(lambda word: (word, 1))

# Reducer part: Count the words occurences by grouping by key
wordCountsReducer = wordCountsMapper.reduceByKey(lambda v1, v2: v1 + v2)

# Display the counts
print(wordCountsReducer.collect())

Actual Affinity propagation Clustering algorithm :

The following class defines a mapper for the Affinity Propagation Clustering Algorithm.

In [None]:
class Mapper:
    """
    This class defines a mapper object that computes the partial centroids for the partial dataset given in arguments.
    """
    
    def __init__(self, data):
        """
        This is the class' constructor.
        
        Parameters
        ----------
        data : Spark RDD
                This is the index i needed to select the first point.
        """
        
        # Dataset
        self.__data = data
        
        # Number of samples in the dataset
        self.__N = data.count()
        
        # Availabilities matrix
        self.__A = np.zeros((self.__N, self.__N))
        
        # Responsabilities matrix
        self.__R = np.zeros((self.__N, self.__N))
        
        # Similarity matrix
        self.__S = np.zeros((self.__N, self.__N))
        
        # List of centroids
        self.__centers = []
    
    def a(self, i, k):
        """
        This method computes the availability sent from point i to point k.
        
        Parameters
        ----------
        i : integer
                This is the index i needed to select the first point.

        k : integer
                This is the index k needed to select the first point.

        Returns
        -------
        a : float
                This is the availability of point i for k.
        """

        if i != k:
            a = min([0, self.__R[k, k] + sum([max(0, self.__R[i_prime, k]) for i_prime in range(self.__N) if i_prime != i and i_prime != k])])
        else:
            a = sum([max(0, self.__R[i_prime, k]) for i_prime in range(self.__N) if i_prime != k])
            
        return a

    def r(self, i, k):
        """
        This method computes the responsability sent from point i to point k.
        
        Parameters
        ----------
        i : integer
                This is the index i needed to select the first point.

        k : integer
                This is the index k needed to select the first point.

        Returns
        -------
        r : float
                This is the responsability of point i for k.
        """

        r = self.__S[i, k] - max([self.__A[i, k_prime] + self.__S[i, k_prime] for k_prime in range(self.__N) if k_prime != k])
        return r

    def s(self, x_i, x_k):
        """
        This method computes the similarity between two points (negative squared error).
        
        Parameters
        ----------
        x_i : numpy array
                This is the ith point of the dataset.

        x_k : numpy array
                This is the kth point of the dataset.

        Returns
        -------
        s : float
                This is the similarity between points i and k.
        """

        s = -np.sum((x_i - x_k) ** 2)
        return s

    def __GenerateSimilarityMatrix(self):
        """
        This method generates the similarity matrix for all the points given to the mapper.
        
        Parameters
        ----------
        None

        Returns
        -------
        None
        """

        # Reformat data to make possible the use of cartesian method
        tmp = sc.parallelize(self.__data.rdd.map(lambda x: np.array([e for e in x])).collect())
        
        # Compute the similarity matrix
        similarityMatrix = tmp.zipWithIndex().cartesian(tmp.zipWithIndex()).map(lambda X: s(X[0][0], X[1][0]) if X[0][1] != X[1][1] else 0.0).collect()
        self.__S = Matrices.dense(self.__N, self.__N, similarityMatrix).toArray()

        # For diagonal: compute "preferences"
        flatS = self.__S.flatten()
        for i in range(self.__N):
            S[i, i] = np.median(flatS[flatS != 0.0])
                            
    def ExecuteAffinityPropagation(self, iterations, lambdaValue = 0.5):
        """
        This method executes the Affinity Propagation algorithm on several iterations.
        
        Parameters
        ----------
        iterations : positive integer
                This is the number of iterations the algorithm will be executed.

        lambdaValue : float
                This is the lambda specified in the paper.

        Returns
        -------
        self.__centers : list
                This list contains all the centroids computed by the algorithm.
        """
        
        # Compute the similarity matrix
        self.__GenerateSimilarityMatrix()

        for it in range(iterations):
            # Update r(i, k) given a(i, k)
            for i in range(self.__N): # For each row
                for k in range(self.__N): # For each column
                    self.__R[i, k] = (1 - lambdaValue) * self.r(i, k) + lambdaValue * self.__R[i, k]

            # Update a(i, k) given r(i, k)
            for i in range(self.__N): # For each row
                for k in range(self.__N): # For each column
                    self.__A[i, k] = (1 - lambdaValue) * self.a(i, k) + lambdaValue * self.__A[i, k]

            # Combine both a(i, k) and r(i, k) to get centers
            self.__centers = [i for i in range(self.__N) if self.__R[i, i] + self.__A[i, i] > 0]
            
        return self.__centers

Let's use the class into Spark.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession # Only for Spark 2
spark = SparkSession.builder.appName("Affinity Propagation algorithm").getOrCreate()
    
# Read the data
irisData = spark.read.option("header","true").csv("iris_species.csv")

# Only keep useful columns
irisData = irisData.select("SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm")

# Make sure we've the right data type
irisData = irisData.withColumn("SepalLengthCm", irisData["SepalLengthCm"].cast("double"))
irisData = irisData.withColumn("SepalWidthCm", irisData["SepalWidthCm"].cast("double"))
irisData = irisData.withColumn("PetalLengthCm", irisData["PetalLengthCm"].cast("double"))
irisData = irisData.withColumn("PetalWidthCm", irisData["PetalWidthCm"].cast("double"))

Let's see how is our data:

In [None]:
irisData.printSchema()

Let's see how many samples we have.

In [None]:
irisData.count()

We'll now split the dataframe to feed several mappers.

In [None]:
splittedDataframe = irisData.randomSplit([0.2, 0.2, 0.2, 0.2, 0.2]) # We do 5 splits with 20% data in each
#mappersArray = [Mapper(splittedDataframe[i]) for i in range(5)]

# Maybe we can do a Spark version of this loop
#for m in range(len(mappersArray)): print(mappersArray[m].ExecuteAffinityPropagation(100))
from Mapper import *
sc.addPyFile("Mapper.py")
mc = Mapper(splittedDataframe[0], sc)
mc.ExecuteAffinityPropagation(100)

In [4]:
from Mapper import *
sc.addPyFile("Mapper.py")

splittedDataframe = irisData.randomSplit([0.2, 0.2, 0.2, 0.2, 0.2]) # We do 5 splits with 20% data in each
tmp = sc.parallelize([Mapper(splittedDataframe[i]) for i in range(5)])

Py4JError: An error occurred while calling o70.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:272)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)



In [None]:


def s(x_i, x_k):
    return -np.sum((x_i - x_k) ** 2)

tmp = sc.parallelize(splittedDataframe[0].rdd.map(lambda x: np.array([e for e in x])).collect())
res = tmp.zipWithIndex().cartesian(tmp.zipWithIndex()).map(lambda X: s(X[0][0], X[1][0]) if X[0][1] != X[1][1] else 0.0).collect()

dim = splittedDataframe[0].count()
S = Matrices.dense(dim, dim, res).toArray()

# For diagonal: compute "preferences"
flatS = S.flatten()
for i in range(dim):
    S[i, i] = np.median(flatS[flatS != 0.0])

In [None]:
rdd = sc.parallelize(range(1, 10))
nrow = int(rdd.count() ** 0.5) # Compute number of rows

res = rdd. \
zipWithIndex(). \
groupBy(lambda X: int(X[1] / nrow)). \
mapValues(lambda vals: [y[0] for y in sorted(vals, key=lambda y: y[0])]).map(lambda x: x[1]).collect()
rdd.toPandas()

In [None]:
# For diagonal: compute "preferences"
flatS = self.__S.flatten()
for i in range(self.__N):
    self.__S[i, i] = np.median(flatS[flatS != 0.0])

In [None]:
plotData = splittedDataframe[0].select("SepalLengthCm", "SepalWidthCm").toPandas()
plt.scatter(plotData["SepalLengthCm"], plotData["SepalWidthCm"])
plt.show()

Now, let's map each data frame to a mapper.

In [None]:
splittedDataframe[0].rdd.map(lambda x: (x, 1)).collect()

In [None]:
# Temporary cell. To be deleted later

N = 10
x_1 = np.random.multivariate_normal(mean = [0,0], cov = [[0.1,0],[0,0.1]], size = N)
x_2 = np.random.multivariate_normal(mean = [3,0], cov = [[0.3,-0.1],[-0.1,0.2]], size = N)
x_3 = np.random.multivariate_normal(mean = [0,3], cov = [[0.2,0],[0,0.2]], size = N)
N = 3 * N
X = np.concatenate((x_1, x_2, x_3))

plt.scatter(X[:,0], X[:,1])
plt.show()

m1 = Mapper(X)
m1Centroids = m1.ExecuteAffinityPropagation(100)

fig = plt.figure()
ax1 = fig.add_subplot(111)

ax1.scatter(X[:, 0], X[:, 1], s = 10, c = 'b', marker = "s", label = 'data')
ax1.scatter(X[m1Centroids, 0], X[m1Centroids, 1], s = 30, c = 'r', marker = "o", label = 'centroids')
plt.legend(loc = 'upper right');
plt.show()