# Clustering

### Code

In [None]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from math import sqrt
from operator import add
import numpy as np

from pyspark import SparkContext
sc = SparkContext(appName="k-mean")

############################################################## Load and prepare the data
data = sc.textFile("...")
#print(data.take(10))

# get the discription and all states info
f = open("...",'r')
states = []
for line in f:
    state = line.split(' ')
    states.append(state[0])
f.close()
states.sort()
#print(states)

def vectorization(plants):
    line = plants.split(',')
    vector = []
    vector.append(line[0])
    for i in range(len(states)):
        if states[i] in line[1:]:
            vector.append(float(1))
        else:
            vector.append(float(0))
    return vector[1:]
dataVector = data.map(vectorization)
#print(dataVector.take(10))

###################################################### normalization
countState = []
for i in range(len(states)):
    print('-------------- ' + str(i) + '/' + str(len(states)) + ' --------------')
    countState.append(dataVector.map(lambda x: x[i]).reduce(add))
countStates = np.array(countState)
normalizedVector = dataVector.map(lambda x: x/countStates).map(lambda x : x[~np.isnan(x)])

print(normalizedVector.take(10))

######################################################## build the model (cluster the data)



# function to evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]   
    return  sqrt(sum([x**2 for x in (point - center)]))
    
#print(parsedData.map(error).collect() )


file = open("Clutering.txt","w")
SSSEList = []
ModelList = []

# try different k clusters
for k in [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]:
    bestSSE = np.inf
# for each k, try 10 times to reduce local optimal effect
     
    for ite in range(10):
        clusters = KMeans.train(normalizedVector, k, maxIterations=30, runs=30, initializationMode="random")
        WSSSE = normalizedVector.map(lambda point: error(point)).reduce(lambda x, y: x + y)
        print("k = "+str(k)+"  Sum of  Error = " + str(WSSSE))
        if WSSSE < bestSSE:
            bestSSE = WSSSE
            bestModel = clusters
    SSSEList.append(bestSSE)
    ModelList.append(bestModel)
    file.write("k = "+str(k)+"  Sum of  Error = " + str(bestSSE) + '\n')
file.close()
sc.stop()

### Centroids for "optimal" k

In [25]:
file = open('centroids.txt','r')
for line in file:
    print(line) 
file.close()

[  1.90870140e-04   2.92123245e-04   1.41890826e-06   1.75501408e-06

   2.14858466e-05   1.51688657e-04   1.92633688e-05   6.83964150e-05

   1.84254495e-05   1.05072921e-06   1.78305562e-06   0.00000000e+00

   4.88785071e-07   5.61662521e-05   1.08927834e-06   4.25374074e-07

   1.81662218e-05   6.75138072e-05   1.22134306e-05   8.74661069e-06

   7.52809863e-06   2.84192839e-06   1.25581915e-06   4.58449357e-04

   2.41267581e-05   2.16784587e-04   5.70207784e-06   7.29765717e-05

   5.12713237e-05   6.54827064e-05   3.48883781e-06   6.72117540e-07

   9.64131607e-05   9.17842865e-05   3.27665807e-06   4.40428700e-05

   1.38090349e-05   2.31477373e-04   6.14308035e-05   1.10738404e-05

   3.08310172e-05   8.24992376e-05   4.52498817e-04   9.78476816e-04

   3.85267376e-05   3.53167322e-05   1.15289567e-05   3.82699479e-06

   1.29309354e-04   4.16733436e-05   1.47800783e-05   0.00000000e+00

   1.89387053e-04   1.76790371e-05   2.08521002e-06   5.43608033e-05

   1.56922678e-04   

### Plot

![1](1.PNG)

In [26]:
file = open('Clutering.txt','r')
for line in file:
    print(line) 
file.close()

k = 2  Sum of  Error = 16.3187719036

k = 3  Sum of  Error = 15.4194825533

k = 4  Sum of  Error = 14.8892304669

k = 5  Sum of  Error = 14.2703893767

k = 6  Sum of  Error = 13.7811348845

k = 7  Sum of  Error = 13.2733313838

k = 8  Sum of  Error = 12.9732788457

k = 9  Sum of  Error = 12.7694618161

k = 10  Sum of  Error = 12.1565061275

k = 11  Sum of  Error = 12.0395908905

k = 12  Sum of  Error = 11.8582970639



## Task 2:

* includes 10 clusters of plants

* Because using binary features as above didn't have a good performance
* I use the count instead of binary occurance

### Code:

In [None]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from math import sqrt
from operator import add
import numpy as np

from pyspark import SparkContext
sc = SparkContext(appName="k-mean")

###################################################### Load and prepare the data
data = sc.textFile("......")
#print(data.take(10))


# get the discription and all states info
f = open(".......",'r')
states = []
for line in f:
    state = line.split(' ')
    states.append(state[0])
f.close()
states.sort()
#print(states)

def vectorization(plants):
    line = plants.split(',')
    vector = []
    vector.append(line[0])
    for i in range(len(states)):
        if states[i] in line[1:]:
            vector.append(float(1))
        else:
            vector.append(float(0))
    return vector[1:]
dataVector = data.map(vectorization)
#print(dataVector.take(10))


############################################################# normalization
countState = []
for i in range(len(states)):
    print('-------------- ' + str(i) + '/' + str(len(states)) + ' --------------')
    countState.append(dataVector.map(lambda x: x[i]).reduce(add))
countStates = np.array(countState)

normalizedVector = dataVector.map(lambda x: x/countStates).map(lambda x : x[~np.isnan(x)])

#print(normalizedVector.take(10))

# build the model (cluster the data)
# function to evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]   
    return  sqrt(sum([x**2 for x in (point - center)]))
    
#print(parsedData.map(error).collect() )

k = 10
bestSSE = np.inf
# for each k, try 10 times to reduce local optimal effect
     
for ite in range(10):
  clusters = KMeans.train(normalizedVector, k, maxIterations=30, runs=30, initializationMode="random")
  WSSSE = normalizedVector.map(lambda point: error(point)).reduce(lambda x, y: x + y)
  print("k = "+str(k)+"  Sum of  Error = " + str(WSSSE))
  if WSSSE < bestSSE:
    bestSSE = WSSSE
    bestModel = clusters
       
print("----------flower clustering done------------")       

def getCluster(point):
    cluster = bestModel.predict(point)
    return [tuple(point),cluster]
Results = normalizedVector.map(getCluster)
centers = bestModel.centers

##################################################### information retrival

def vectorization(plants):
    line = plants.split(',')
    vector = []
    vector.append(line[0])
    for i in range(len(states)):
        if states[i] in line[1:]:
            vector.append(float(1))
        else:
            vector.append(float(0))
    return [vector[1:], line[1:]]
dataVector = data.map(vectorization)
#print(dataVector.take(10))
# normalization

normalizedVector = dataVector.map(lambda x: [x[0]/countStates, x[1]]).\
                map(lambda x : [tuple(x[0][~np.isnan(x[0])]), x[1]])

#print(normalizedVector.take(2))
#print(Results.take(2))
joinData = normalizedVector.join(Results).map(lambda x: x[1])
#print(joinData.take(2))


def toFlat(x):
    returnList = []
    for item in x[0]:
        returnList.append([item, x[1]])
    return returnList

flatJoinData = joinData.flatMap(toFlat).groupByKey()
#print(flatJoinData.take(20))

def newVectorization(state):
    x = list(state[1])
    vector = []
    for i in range(k):
      count = 0
      for j in range(len(x)):
          if i == x[j]:
              count += 1
      vector.append(float(count))
    return vector
newDataVector = flatJoinData.map(newVectorization)
for item in newDataVector.collect():
    print(item)

print("----------generating state vector done------------")  

####################################################################### train clusters
file = open("Clutering2.txt","w")
SSSEList = []
ModelList = []

# try different k clusters
for k in [2, 3, 4, 5, 6, 7, 8]:
  bestSSE = np.inf
# for each k, try 10 times to reduce local optimal effect     
  for ite in range(10):
    clusters = KMeans.train(newDataVector, k, maxIterations=30, runs=30, initializationMode="random")
    WSSSE = newDataVector.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("k = "+str(k)+"  Sum of  Error = " + str(WSSSE))
    if WSSSE < bestSSE:
        bestSSE = WSSSE
        bestModel = clusters
  SSSEList.append(bestSSE)
  ModelList.append(bestModel)
  file.write("k = "+str(k)+"  Sum of  Error = " + str(bestSSE) + '\n')
file.close()

sc.stop()

### Plot:

![2](2.PNG)

In [1]:
file = open('Clutering2.txt','r')
for line in file:
    print(line) 
file.close()

k = 2  Sum of  Error = 11992675.5212

k = 3  Sum of  Error = 5922058.19216

k = 4  Sum of  Error = 2423119.45705

k = 5  Sum of  Error = 2112294.93036

k = 6  Sum of  Error = 2098579.92989

k = 7  Sum of  Error = 2096928.56159

