---
# Spatial Pooler
- SDR (Sparse Destribution Representation, 희소 분포 표상) 생성
- 입력에 대해 pooler 를 이루고 있는 column 들의 일부 (=~2% 내외) 만 활성화 한다
- 모든 column 이 비슷한 빈도로 active 되어야함
- 모든 column 의 synapse 들이 비슷한 빈도로 active 되어야함

---

In [1]:
import numpy as np
import collections
import matplotlib.pyplot as plt
import math

In [25]:
class SpatialPooler:
    def __init__(self, input_size, columns=100, conPerm=.5, minOver=5):
        self.input_size = input_size                # input vector 크기
        self.input_data = np.empty([self.input_size])
        self.columnCount = columns                           # column 의 가로 크기
        self.connectedPerm = conPerm                     # synapse 활성화(1) 될 permanence 임계치
        self.min_overlap = minOver # 발화 하기 위한 컬럼 당 최소한의 overlap count
        self.minGlobalActivity = 0                  # 승리하기 위해 필요한 score (global inhibition)
        self.desiredGlobalActivity = int(0.05 * self.columnCount) # 한 번에 승리할 column 수 (global inhibition)
        self.minDutyCycle = 0                       # column 당 최소 발화 duty
        self.highDutyCycle = 0
        self.permanence_inc = .01                   # 학습시 permanence 증가량
        self.permanence_dec = .01                   # 학습시 permanence 감소량
        self.history_capacity = 50
        self.step = 0                               # 데이터 처리한 수
        
        self.columns = np.random.rand(self.input_size, self.columnCount) # permanence ndarry. 초기화 필요      
        self.columns_avail = np.zeros([self.input_size, self.columnCount]) # 모든 셀들의 연결 유무 정보
        self.boosts = np.ones([self.columnCount])                    # 보정에 필요한 boost 
        self.overlapped = np.zeros([self.columnCount])                 # input 과 연결된 synapse 들과의 최초 계산
        self.activeColumns = np.zeros([self.columnCount])
        self.activeHistory = []                                                    # active duty 를 계산하기 위한 active 기록
        self.overlapHistory = []                                                   # overlap duty 를 계산하기 위한 overlap 기록
        self.activeDutyInfo = np.zeros([self.columnCount])             # active duty 정보
        self.overlapDutyInfo = np.zeros([self.columnCount])            # overlap duty 정보
        
        ## duty 계산을 위한 history 생성 ##
        for c in range(self.columnCount):
            self.activeHistory.append(collections.deque())
            self.overlapHistory.append(collections.deque())
            
                
    ''' SDR 생성 '''
    def compute_SDR(self, input_data):
        
        self.input_data = input_data
        
        ## 1. overlaping ##
        
        self.columns_avail = self.columns > self.connectedPerm
        self.overlapped = self.input_data @ self.columns_avail
        #print(self.overlapped)
        
        for c in range(self.columnCount):
            if(self.overlapped[c] > self.min_overlap):
                self.overlapped[c] *= self.boosts[c]

                if(len(self.overlapHistory[c]) >= self.history_capacity):
                    self.overlapHistory[c].popleft()

                self.overlapHistory[c].append(True)

            else:
                self.overlapped[c] = 0

                if(len(self.overlapHistory[c]) >= self.history_capacity):
                    self.overlapHistory[c].popleft()

                self.overlapHistory[c].append(False)
                    
                    
        ## 2. inhibition (global) ##
        
        self.minGlobalActivity = self.kthScore(self.desiredGlobalActivity)
        self.activeColumns = self.overlapped > self.minGlobalActivity
        
        for c in range(self.columnCount):                
            if(len(self.activeHistory[c]) >= self.history_capacity):
                self.activeHistory[c].popleft()

            self.activeHistory[c].append(self.activeColumns[c])
                    
                
        ## 3. learning ## 
        for c in range(self.columnCount):
                
            if self.activeColumns[c] == 1:
                for s in range(self.input_size):

                    if(self.columns_avail[s, c] == 1):
                        if(self.input_data[s] == 1):
                            self.columns[s, c] += self.permanence_inc
                            self.columns[s, c] = min(self.columns[s, c], 1.0)
                        else:
                            self.columns[s, c] -= self.permanence_dec
                            self.columns[s, c] = max(self.columns[s, c], 0.0)
                                            
                    
        ## 3.2. 보정 작업 ##
        self.update_activeDuty()
        self.update_overlapDuty()
        self.step += 1
        
        for c in range(self.columnCount):
            ## 자주 승리하지 못하는 column 에 대하여 잘 발화할 수 있도록 boost 시켜줌
            self.minDutyCycle = .1 * self.maxDutyCycle()
            self.highDutyCycle = 2 * self.minDutyCycle
            #print("min :", self.minDutyCycle)
            self.boostFunction(c, .01)
            #print(self.boosts)

            ## input 과 잘 겹치지 않는 synapse 에 대해서 permanence 증가시켜줌
            if(self.overlapDutyInfo[c] < self.minDutyCycle):
                self.increase_Permanence(c)
                #print("min", self.minDutyCycle)
                    
    def getActiveColumns(self):
        return self.activeColumns
                        
                        
    ''' global 하게 승리할 컬럼의 기준 '''
    def kthScore(self, desired_kth):
        
        rank = self.overlapped.ravel().copy()
        rank.sort()        
        score = rank[-desired_kth]
        
        return score
    
    
    ''' global 하게 가장 자주 승리한 컬럼의 duty '''
    def maxDutyCycle(self):
        
        rank = self.activeDutyInfo.ravel().copy()
        rank.sort()
        maxDuty = rank[-1]
        
        return maxDuty
    
    
    ''' 해당 column 이 발화하도록 격려 '''
    def boostFunction(self, c, boost):
        if(self.activeDutyInfo[c] <= self.minDutyCycle):
            self.boosts[c] += boost
        elif(self.activeDutyInfo[c] > self.highDutyCycle):
            self.boosts[c] -= boost
            
            
    ''' 해당 column 의 모든 셀의 synapse 의 permanence 를 증가시켜 잘 겹치도록 격려 '''
    def increase_Permanence(self, c):
        self.columns[:, c] += self.permanence_inc
        
        
    ''' activeDuty update '''
    def update_activeDuty(self):
        for c in range(self.columnCount):
            self.activeDutyInfo[c] = np.sum(self.activeHistory[c]) / self.history_capacity

                
    ''' overlapDuty update '''
    def update_overlapDuty(self):
        for c in range(self.columnCount):
            self.overlapDutyInfo[c] = np.sum(self.overlapHistory[c]) / self.history_capacity
    
    def visualize_SDR(self):
        #plt.imshow(self.activeColumns)
        #plt.subplots_adjust(bottom=0.1, right=0.8, top=0.9)
        #cax = plt.axes([0.85, 0.1, 0.075, 0.8])
        #plt.colorbar(cax=cax)
        #plt.show()
        
        sparsity = (np.count_nonzero(self.activeColumns==True)/(self.columnCount))
        return sparsity

In [None]:
#-- 1. sort 1열로 됨?
#-- 2. 행렬 곱 테스트 3d array
#-- 3. KthScore 정렬 잘 될까
# 4. 학습과 보정이 잘 진행되는지 확인
# 5. 제대로 된 data 주입해보기
# 같은 데이터를 집어넣으니까 보정 작업에서 발산한다.
# 그것보다 duty 계산 보정을 해야한다.
# 6. activeDuty 보수하기

import random

np.set_printoptions(threshold=np.nan)

sp = SpatialPooler(10,1024,.8,3)
        
step = 2000
on_cnt = 5
rand_data = np.zeros([10])

y=np.empty([step])

for s in range(step):
    rand_data = np.zeros([10])
    
    for i in range(on_cnt):
        idx = random.randint(0,9)
        
        while(True):
            if(rand_data[idx] == 1):
                idx = random.randint(0,9)
            else:
                break
                
        rand_data[idx] = 1
        
    sp.compute_SDR(rand_data)
    y[s] = sp.visualize_SDR()
    #y.append(sp.visualize_SDR())
    
plt.plot(range(step), y)

print("boost : \n", sp.boosts)
print("active : \n", sp.activeDutyInfo)
print("overlap : \n", sp.overlapDutyInfo)

print("-"*50)
print("\n")
print("sparsity 평균 : {}, 표준편차 : {}\n".format(np.mean(y), np.std(y)))
print("activeDuty 평균 : {}, 표준편차 : {}\n".format(np.mean(sp.activeDutyInfo), np.std(sp.activeDutyInfo)))
print("overlap 평균 : {}, 표준편차 : {}\n".format(np.mean(sp.overlapDutyInfo), np.std(sp.overlapDutyInfo)))