In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("GKA_test").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [3]:
import pandas as pd
import numpy as np
import random
import math
import time
import matplotlib.colors as colors
from sklearn import datasets
from sklearn.preprocessing import normalize
from sklearn.metrics import davies_bouldin_score
from sklearn.metrics.cluster import adjusted_rand_score, silhouette_score

In [4]:
iris = datasets.load_iris()
data = iris['data']
labels = iris['target']
print(data.shape)

(150, 4)


In [5]:
m = data.shape[0]
dim = data.shape[1]
population_size = 5
num_cluster = 3

In [6]:
class Chromosome():
    def __init__(self, data, num_cluster):
        #初始化參數
        self.kmax = num_cluster
        self.data_num = data.shape[0]
        self.dim = data.shape[1]
        self.center = self.init_center(data)
        self.sol = None
    
    #隨機選num_cluster個中心點    
    def init_center(self, data):
        center = []
        selected = random.sample(range(self.data_num), self.kmax)
        for i in range(self.kmax):
            center.append(data[selected[i]])            
        return center
        
    #對於每一個染色體，隨機產生一組解 => 每一個等位基因代表對應的群 => shape=(150, 1)
    def cal_solution(self, rdd):
        self.sol = np.array(rdd.map(lambda p: self.closestPoint(p, self.center)).take(self.data_num))

    def distance(self, a, b):
        return np.sum((a-b)*(a-b))
    
    #這個函數的目的就是求取該點應該分到哪個中心點的集合去，返回的是序號
    def closestPoint(self, p, centers):
        bestIndex = 0
        closest = float("+inf")
        for i in range(len(centers)):
            tempDist = np.sum((p - centers[i]) ** 2)
            if tempDist < closest:
                closest = tempDist
                bestIndex = i
        return bestIndex
    
    def cal_fitness(self, data):
        return silhouette_score(data, self.sol)
    
    def cal_SSE(self):
        sse = 0.0
        for i in range(len(self.sol)): 
            square_error = self.distance(self.data[i], self.center[self.sol[i]])
            sse += square_error
        return sse
    
    def KMO(self, rdd):
        #計算每一筆資料跟行星的距離
        #對所有資料執行map過程，最終生成的是(index, (point, 1))的rdd
        closest = rdd.map(lambda p: (self.closestPoint(p, self.center), (p, 1)))
        sol_tmp = closest.take(self.data_num)
        #執行reduce過程，該過程的目的是重新求取中心點，生成的也是rdd
        pointStats = closest.reduceByKey(lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
        #生成新的中心點
        newPoints = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
        #計算新舊中心點的距離差
        tempDist = sum(np.sum((self.center[iK] - p) ** 2) for (iK, p) in newPoints)
        #設置新的中心點
        for (iK, p) in newPoints:
            self.center[iK] = p
        #更新分群解
        for i in range(len(sol_tmp)):
            self.sol[i] = sol_tmp[i][0]
        #更新SSE
        #self.SSE = self.cal_SSE()

In [7]:
#適者生存
def selection(chromosomes, Ps, data):
    size = len(chromosomes)
    new_populations = []
        
    #計算個染色體的適應值,並統計存活率
    for i in range(size):
        chromosomes[i].fitness = chromosomes[i].cal_fitness(data)
    #存活率
    print('survival rate:', Ps*100, '%')

    print('Before Selection:')
    chromosomes = sorted(chromosomes, reverse=True, key=lambda elem: elem.fitness)
    for i in range(len(chromosomes)):
        print('chromosome', i, "'s fitness value", chromosomes[i].fitness)

    #找出(存活率*個體數)個適應值的染色體
    #適應值越大越容易存活
    for i in range(int(population_size*Ps)):
        new_populations.append(chromosomes[i])
    
    #填滿染色體數
    while len(new_populations) < size:
        idx = random.randint(0, 4)
        new_populations.append(chromosomes[idx])
        
    print('After Selection:')
    new_populations = sorted(new_populations, reverse=True, key=lambda elem: elem.fitness)
    for i in range(len(new_populations)):
        print('chromosome', i, "'s fitness value", new_populations[i].fitness)
    
    return new_populations

In [8]:
#交配
def crossover(data, chromosomes, Pc):
    numOfInd = len(chromosomes)
    #根據交配得到數量並隨機選出染色體
    index = random.sample(range(0, numOfInd - 1), int(Pc * numOfInd))
    
    new_chromosomes = []
    for i in range(len(index)):  # do how many time
        new_chromosomes = doCrossover(data, chromosomes, i, index)
        
    return new_chromosomes


def doCrossover(data, chromosomes, i, index):
    length = chromosomes[0].sol.shape[0]
    cut = random.randint(1, length - 2)
    #依取樣順序跟隔壁交換基因(每一筆資料的分群) => sol為基因
    parent1 = chromosomes[index[i]]
    parent2 = chromosomes[index[(i + 1) % len(index)] % length]
    child1 = Chromosome(data, num_cluster)
    child2 = Chromosome(data, num_cluster)
        
    p1 = list(parent1.sol)
    p2 = list(parent2.sol)
    c1 = p1[0:cut] + p2[cut:length]
    c2 = p1[cut:length] + p2[0:cut]
    child1.sol = np.array(c1)
    child2.sol = np.array(c2)
        
    # 計算child適應值
    child1.fitness = child1.cal_fitness(data)
    child2.fitness = child2.cal_fitness(data)
        
    #父子兩代在競爭一次,留下適應值大的
    listA = []
    listA.append(parent1)
    listA.append(parent2)
    listA.append(child1)
    listA.append(child2)
    #依適應值反向排序
    listA = sorted(listA, reverse=True, key=lambda elem: elem.fitness)
        
    #留下最大的兩個
    chromosomes[index[i]] = listA[0]
    chromosomes[index[(i + 1) % len(index)] % length] = listA[1]

    return sorted(chromosomes, reverse=True, key=lambda elem: elem.fitness)

In [9]:
fitness_his = []
sse_his = []
ari_his = []
cen_his = []
population = []

In [18]:
rdd = sc.parallelize(data)
rdd.cache()
for i in range(2):
    population.append(Chromosome(data, num_cluster))
    #population[i].center = rdd.takeSample(False, num_cluster, 1)
    population[i].cal_solution(rdd)
    print('<the', i+1, 'th chromosome has been initialized.>')

c_rdd = sc.parallelize(population)
c_rdd.cache()
new_c_rdd = c_rdd.map(lambda chromo: (chromo.KMO(rdd), chromo))

<the 1 th chromosome has been initialized.>
<the 2 th chromosome has been initialized.>


In [17]:
result = new_c_rdd.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 57.0 failed 1 times, most recent failure: Lost task 6.0 in stage 57.0 (TID 165, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 147, in load_stream
    yield self._read_with_length(stream)
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 580, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'Chromosome' on <module 'pyspark.worker' from 'E:\\spark\\spark-2.4.3-bin-hadoop2.7\\python\\lib\\pyspark.zip\\pyspark\\worker.py'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 147, in load_stream
    yield self._read_with_length(stream)
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "E:\spark\spark-2.4.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 580, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'Chromosome' on <module 'pyspark.worker' from 'E:\\spark\\spark-2.4.3-bin-hadoop2.7\\python\\lib\\pyspark.zip\\pyspark\\worker.py'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [10]:
tic = time.perf_counter()

rdd = sc.parallelize(data)
rdd.cache()
for i in range(population_size):
    population.append(Chromosome(data, num_cluster))
    #population[i].center = rdd.takeSample(False, num_cluster, 1)
    population[i].cal_solution(rdd)
    print('<the', i+1, 'th chromosome has been initialized.>')

c_rdd = sc.parallelize(population)
c_rdd.cache()
#for each generation
for i in range(3):
    print('[the', i+1, 'th generation]')
    population = selection(population, 0.8, data)
    population = crossover(data, population, 0.8)
        
    print('After Crossover:')
    for i in range(len(population)):
        print('chromosome', i, "'s fitness value", population[i].fitness)
                    
    #for each chromosome
    for j in range(population_size):
        #population[j].mutation()
        population[j].KMO(rdd)
        print('the', j+1, "'s KMO has been finished.")
    #找出最大適應值的染色體
    population = sorted(population, reverse=True, key=lambda elem: elem.fitness)
    ari = adjusted_rand_score(labels, population[0].sol)
    print('Fitness value:', population[0].fitness)
    #print('Sum of Square Error:', population[0].SSE)
    print('Adjusted Rand Index:', ari)
    print('=======================================')

toc = time.perf_counter()
spend_time = str(1000*(toc - tic))
print("Comuptation Time: " + spend_time + "ms")

<the 1 th chromosome has been initialized.>
<the 2 th chromosome has been initialized.>
<the 3 th chromosome has been initialized.>
<the 4 th chromosome has been initialized.>
<the 5 th chromosome has been initialized.>
[the 1 th generation]
survival rate: 80.0 %
Before Selection:
chromosome 0 's fitness value 0.5172890482875971
chromosome 1 's fitness value 0.504827082425721
chromosome 2 's fitness value 0.45074576598168903
chromosome 3 's fitness value 0.427191286691874
chromosome 4 's fitness value 0.24433333080852956
After Selection:
chromosome 0 's fitness value 0.5172890482875971
chromosome 1 's fitness value 0.504827082425721
chromosome 2 's fitness value 0.45074576598168903
chromosome 3 's fitness value 0.427191286691874
chromosome 4 's fitness value 0.427191286691874
After Crossover:
chromosome 0 's fitness value 0.5172890482875971
chromosome 1 's fitness value 0.504827082425721
chromosome 2 's fitness value 0.45074576598168903
chromosome 3 's fitness value 0.427191286691874
c

KeyboardInterrupt: 