In [1]:
import numpy as np
import itertools
import cv2
import random
import copy
from time import sleep, time
import pickle

### Parâmetros

In [14]:
def initialize_parameters(new):
    params = {}
    params['nlayer'] = 2
    params['wlen'] = 3
    params['wsize'] = params['wlen'] ** 2
    params['dim'] = 56
    params['train_size'] = 30
    params['val_size'] = 10
    params['test_size'] = 10

    if new:
        Wini = np.array([np.nan, 1., np.nan, 1., 1., 1., np.nan, 1., np.nan])
        params['W'] = [Wini.copy() for _ in range(params['nlayer'])]
        params['joint'] = [create_joint(Wini) for _ in range(params['nlayer'])]
    else:
        params['joint'] = np.load('joint.txt', allow_pickle=True)
        params['W'] = np.load('W.txt', allow_pickle=True)

    params['windows_continuos'] = np.load('windows_continuos.txt', allow_pickle=True)

    return params

In [3]:
def create_joint(W):
    Ji=[]
    ni = int(W[~np.isnan(W)].sum())
    for i in itertools.product([0, 1], repeat=ni):
        Ji.append(''.join(np.array(i).astype(str)))
    np.random.seed(0)
    return np.c_[Ji, np.random.randint(2, size=len(Ji))]

In [15]:
params = initialize_parameters(new = False)

In [16]:
nlayer = params['nlayer']
wlen = params['wlen']
wsize = params['wsize']
dim = params['dim']
train_size = params['train_size']
val_size = params['val_size']
test_size = params['test_size']
W = params['W']
joint = params['joint']
windows_continuos = params['windows_continuos']

In [6]:
W

[array([nan,  1., nan,  1.,  1.,  1., nan, nan, nan]),
 array([nan,  1., nan,  1.,  1.,  1.,  1.,  1.,  1.])]

In [7]:
joint

[array([['0000', '1'],
        ['0001', '1'],
        ['0010', '1'],
        ['0011', '0'],
        ['0100', '1'],
        ['0101', '1'],
        ['0110', '0'],
        ['0111', '0'],
        ['1000', '1'],
        ['1001', '0'],
        ['1010', '0'],
        ['1011', '0'],
        ['1100', '0'],
        ['1101', '0'],
        ['1110', '0'],
        ['1111', '0']], dtype='<U11'),
 array([['0000000', '0'],
        ['0000001', '0'],
        ['0000010', '1'],
        ['0000011', '1'],
        ['0000100', '0'],
        ['0000101', '1'],
        ['0000110', '1'],
        ['0000111', '1'],
        ['0001000', '0'],
        ['0001001', '1'],
        ['0001010', '1'],
        ['0001011', '1'],
        ['0001100', '0'],
        ['0001101', '1'],
        ['0001110', '0'],
        ['0001111', '1'],
        ['0010000', '0'],
        ['0010001', '0'],
        ['0010010', '0'],
        ['0010011', '1'],
        ['0010100', '1'],
        ['0010101', '1'],
        ['0010110', '1'],
        ['0010111'

In [8]:
W_hist = np.load('W_hist.txt', allow_pickle=True)

In [10]:
increase = int(round(wlen/2-0.1,0))

In [11]:
def convert_binary(img):
    (T, img_bin) = cv2.threshold(img, 100, 255, cv2.THRESH_BINARY)
    img_bin[(img_bin==0)]=1
    img_bin[(img_bin==255)]=0
    img_bin = img_bin.astype(int)
    return img_bin

In [12]:
train = []
ytrain = []
for img in range(1,train_size+1):
    s = str(img)
    s = s.zfill(2)
    x = cv2.imread('./x/train'+s+'.jpg', cv2.IMREAD_GRAYSCALE)
    y = cv2.imread('./y/train'+s+'.jpg', cv2.IMREAD_GRAYSCALE)
    train.append(convert_binary(x))
    ytrain.append(convert_binary(y))

In [13]:
val = []
yval = []
for img in range(1,val_size+1):
    s = str(img)
    s = s.zfill(2)
    x = cv2.imread('./x/val'+s+'.jpg', cv2.IMREAD_GRAYSCALE)
    y = cv2.imread('./y/val'+s+'.jpg', cv2.IMREAD_GRAYSCALE)
    val.append(convert_binary(x))
    yval.append(convert_binary(y))

In [17]:
test = []
ytest = []
for img in range(1,test_size+1):
    s = str(img)
    s = s.zfill(2)
    x = cv2.imread('./x/test'+s+'.jpg', cv2.IMREAD_GRAYSCALE)
    y = cv2.imread('./y/test'+s+'.jpg', cv2.IMREAD_GRAYSCALE)
    test.append(convert_binary(x))
    ytest.append(convert_binary(y))

In [18]:
train[0].shape, ytrain[0].shape, val[0].shape, yval[0].shape, test[0].shape, ytest[0].shape

((56, 56), (56, 56), (56, 56), (56, 56), (56, 56), (56, 56))

In [19]:
def save_results(Wtrain, Wval, Wtest):
    for img in range(len(Wtrain)):
        s = str(img+1)
        s = s.zfill(2)
        x = copy.deepcopy(Wtrain[img][1])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results/train'+s+'.jpg', x)

    for img in range(len(Wval)):
        s = str(img+1)
        s = s.zfill(2)
        x = copy.deepcopy(Wval[img][1])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results/val'+s+'.jpg', x)
        
    for img in range(len(Wtest)):
        s = str(img+1)
        s = s.zfill(2)
        x = copy.deepcopy(Wtest[img][1])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results/test'+s+'.jpg', x)
    

In [20]:
def save_results_complet(Wtrain, Wval, Wtest):
    for img in range(len(Wtrain)):
        s = str(img+1)
        s = s.zfill(2)
        x = copy.deepcopy(Wtrain[img][0])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results_complet/train_op1_'+s+'.jpg', x)
        x = copy.deepcopy(Wtrain[img][1])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results_complet/train_op2_'+s+'.jpg', x)

    for img in range(len(Wval)):
        s = str(img+1)
        s = s.zfill(2)
        x = copy.deepcopy(Wval[img][0])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results_complet/val_op1_'+s+'.jpg', x)
        x = copy.deepcopy(Wval[img][1])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results_complet/val_op2_'+s+'.jpg', x)
        
    for img in range(len(Wtest)):
        s = str(img+1)
        s = s.zfill(2)
        x = copy.deepcopy(Wtest[img][0])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results_complet/test_op1_'+s+'.jpg', x)
        x = copy.deepcopy(Wtest[img][1])
        x[(x==0)]=255
        x[(x==1)]=0
        cv2.imwrite('./results_complet/test_op2_'+s+'.jpg', x)
    

In [21]:
def apply_window(x, W_n, j_n):
    Xl = np.c_[np.zeros([x.shape[0], increase], dtype=int), x, np.zeros([x.shape[0], increase], dtype=int)]
    Xl = np.r_[np.zeros([increase, Xl.shape[1]], dtype=int), Xl, np.zeros([increase, Xl.shape[1]], dtype=int)]
    
    z = np.zeros([x.shape[0], x.shape[0]], dtype=int)
    
    for i in range(z.shape[0]):
        for j in range(z.shape[1]):
            p = Xl[i:i+wlen, j:j+wlen].flatten()
            p = p * W_n
            p = (p[~np.isnan(p)].astype(int))
            p = ''.join(p.astype(str))
            
            indices = np.where(j_n[:, 0] == p)
            if indices[0].size > 0 and j_n[indices[0], 1] == '1':
                z[i, j] = 1
                
    return z

In [22]:
def run_window_hood(sample, sample_size, W_current, joint_current, Wlast, layer):
    Wsample = []
    for k in range(sample_size):
        Wsample_k = [] 
        for i in range(nlayer):
            if layer > i:
                Wsample_k.append(Wlast[k][i])
            elif i==0:
                Wsample_k.append(apply_window(sample[k], W_current[i], joint_current[i]))
            else:
                Wsample_k.append(apply_window(Wsample_k[i-1], W_current[i], joint_current[i]))
        Wsample.append(Wsample_k)
    return Wsample

In [16]:
Wtrain = run_window_hood(train, train_size, W, joint, W, 0)

In [17]:
Wval = run_window_hood(val, val_size, W, joint, W, 0)

In [23]:
Wtest = run_window_hood(test, test_size, W, joint, W, 0)

In [24]:
def calculate_error(y, h):
    error = 0
    n_samples = len(y)
    for k in range(n_samples):
        sample_error = np.abs(h[k][-1] - y[k]).sum()
        error += sample_error / (y[k].size * n_samples)
    return error     

In [25]:
def joint_history(joint, nlayer):
    for k in range(nlayer):
        if k==0:
        #joint_temp.append(joint[k][:,1])
            joint_hist = ''.join(joint[k][:,1])
        else:
            joint_hist = joint_hist+''.join(joint[k][:,1])
    return joint_hist


In [26]:
def window_history(W, nlayer, wsize):
    for k in range(nlayer):
        if k==0:
            window_hist = ''.join([''.join(item) for item in np.reshape(W[k], (wsize,)).astype(str)])
        else:
            window_hist = window_hist+''.join([''.join(item) for item in np.reshape(W[k], (wsize,)).astype(str)])
    return window_hist

In [27]:
def get_error_window(W, joint, nlayer, train, train_size, Wtrain, ytrain):
    start = time()
    keep = True
    w_error = calculate_error(ytrain, Wtrain)
    joint_hist = []
    print('----------')
    print('Entrando no get error window')
    print('start error: ', w_error)
    qq = 0
    flg2 = 0
    while keep:
        qq += 1
        #print('Teste ', qq)
        flg = 0
        joint_hist.append(joint_history(joint, nlayer))
        for k in range(nlayer):
            for i in range(len(joint[k])):
                # print('testing layer ', k)
                # print('joint ', joint[k][i][0])
                joint_temp = copy.deepcopy(joint)
                if joint[k][i][1] == '1':
                    joint_temp[k][i][1] = '0'
                else:
                    joint_temp[k][i][1] = '1'
                j_temp = joint_history(joint_temp, nlayer)
                if j_temp not in joint_hist:
                    joint_hist.append(j_temp)
                    W_hood = run_window_hood(train, train_size, W, joint_temp, Wtrain, k)
                    error_hood = calculate_error(ytrain, W_hood)
                    # print('error: ', error_hood)
                    if error_hood < w_error:
                        # print('erro menor')
                        w_error = error_hood
                        # print(w_error)
                        joint_new = copy.deepcopy(joint_temp)
                        Wtrain_new = copy.deepcopy(W_hood)
                        flg = 1
                        flg2 = 1
                    # print('----------')
        if flg == 0:
            #print('Sem erros Menores no teste ', qq)
            keep = False
        else:
            joint = copy.deepcopy(joint_new)
            Wtrain = copy.deepcopy(Wtrain_new)
            #print('error do teste ', qq, ': ', w_error)
            # print('flg = 1 do teste ', qq)

    #print('----------')
    print('end of testing, fl2 = ', flg2)

    if flg2 == 1:
        Wval = run_window_hood(val, val_size, W, joint, W, 0)
    error_val = calculate_error(yval, Wval)
    error = np.array([w_error, error_val])

    #print('-.-.-.-.-.-.-.-.-.-')
    #end = time()
    #print('tempo de execução: {}'.format(end - start))
    save_results(Wtrain, Wval)

    return (joint, error, flg2, Wtrain, Wval)

In [122]:
joint, error, flg, Wtrain, Wval = get_error_window(W, joint, nlayer,train, train_size, Wtrain, ytrain)

----------
Entrando no get error window
start error:  0.14903273809523812
end of testing, fl2 =  1


In [123]:
error

array([0.01852679, 0.01817602])

In [28]:
def save_window(joint, W):

    filename_joint = 'joint.txt'
    pickle.dump(joint, open(filename_joint, 'wb'))

    filename_W = 'W.txt'
    pickle.dump(W, open(filename_W, 'wb'))

In [130]:
save_window(joint, W)

In [29]:
def check_great_neighboors(W, nlayer, wsize, wlen, joint, train, train_size, Wtrain, ytrain, error):
    global w_hist
    flg = 0
    print('\n check_great_neighboors Start')
    
    for k in range(nlayer):
        #print('testing layer ', k)
        nan_idx = np.where(np.isnan(W[k]))[0]
        w_line_temp_base = W[k].copy()
        
        for i in nan_idx:
            W_line_temp = w_line_temp_base.copy()
            W_line_temp[i] = 1
            W_line_temp_NN = W_line_temp.copy()
            W_line_temp_NN[np.isnan(W_line_temp_NN)] = 0
            W_line_temp_NN = W_line_temp_NN.astype(int)
                
            if ''.join(W_line_temp_NN.astype(str)) in windows_continuos:
                #print('W in windows continuos')
                W_temp = W.copy()
                W_temp[k] = W_line_temp
                W_h = window_history(W_temp, nlayer, wsize)

                if W_h not in w_hist['W']:
                    joint_temp = joint.copy()
                    joint_temp[k] = create_joint(W_temp[k])

                    W_train_temp = run_window_hood(train, train_size, W_temp, joint_temp, Wtrain, k)

                    joint_temp, w_error, flg2, W_train_temp, W_val_temp = get_error_window(W_temp, joint_temp, nlayer,train, train_size, W_train_temp, ytrain)
                    #print('new Validation error: ', w_error[1])
                    if w_error[1] < error[1]:
                        #print('erro menor')
                        error_new = w_error
                        joint_new = joint_temp.copy()
                        Wtrain_new = W_train_temp.copy()
                        Wval_new = W_val_temp.copy()
                        W_new = W_temp.copy()
                        save_window(joint_new, W_new)
                        flg = 1
                    #print('----------')

                    w_hist["W"].append(W_h)
                    w_hist["error"].append(w_error)
                    pickle.dump(w_hist, open('W_hist.txt', 'wb'))
            #else:
                #print('W not in window continuos')
    print('flg = ',flg)
    if flg == 1:
        return W_new, joint_new, Wtrain_new, Wval_new, error, flg
    else:
        return W, joint, Wtrain, Wval, error, flg

In [30]:
def check_lesser_neighboors(W, nlayer, wsize, wlen, joint, train, train_size, Wtrain, ytrain, error):
    global w_hist
    flg = 0
    print('\n check_lesser_neighboors Start')
    
    
    for k in range(nlayer):
        #print('testing layer ', k)
        nan_idx = np.where(W[k]==1)[0]
        w_line_temp_base = W[k].copy()
        
        for i in nan_idx:
            W_line_temp = w_line_temp_base.copy()
            W_line_temp[i] = np.nan
            W_line_temp_NN = W_line_temp.copy()
            W_line_temp_NN[np.isnan(W_line_temp_NN)] = 0
            W_line_temp_NN = W_line_temp_NN.astype(int)
            
            if ''.join(W_line_temp_NN.astype(str)) in windows_continuos:
                #print('W in windows continuos')
                W_temp = W.copy()
                W_temp[k] = W_line_temp
                W_h = window_history(W_temp, nlayer, wsize)
                
                if W_h not in w_hist['W']:
                    joint_temp = joint.copy()
                    joint_temp[k] = create_joint(W_temp[k])
                    W_train_temp = run_window_hood(train, train_size, W_temp, joint_temp, Wtrain, k)
                    
                    joint_temp, w_error, flg2, W_train_temp, W_val_temp = get_error_window(W_temp, joint_temp, nlayer,train, train_size, W_train_temp, ytrain)
                    #print('new Validation error: ', w_error[1])
                    if w_error[1] < error[1]:
                        print('error menor')
                        error[0] = w_error[0]
                        error[1] = w_error[1]
                        joint_new = joint_temp.copy()
                        Wtrain_new = W_train_temp.copy()
                        Wval_new = W_val_temp.copy()
                        W_new = W_temp.copy()
                        print('W_new: ', W_new)
                        save_window(joint_new, W_new)
                        flg = 1
                    #print('----------')
                    
                    w_hist["W"].append(W_h)
                    w_hist["error"].append(w_error)
                    pickle.dump(w_hist, open('W_hist.txt', 'wb'))
                #else:
                    #print('W already in history')
            #else:
                #print('W not in window continuos')
    print('flg = ',flg)
    if flg == 1:
        return W_new, joint_new, Wtrain_new, Wval_new, error, flg
    else:
        return W, joint, Wtrain, Wval, error, flg

In [28]:
start = time()
keep = True
print('start Validation error: ', error[1])
print('----------')
qq=0
w_hist = {"W":[],"error":[]}
w_hist["W"].append(window_history(W, nlayer, wsize))
w_hist["error"].append(error)
    
pickle.dump(w_hist, open('W_hist.txt', 'wb'))

while keep:
#for i in range(len(windows_continuos)):
    qq+=1
    print('Teste de Janela ',qq)
    
    W, joint, Wtrain, Wval, error, flg1 = check_lesser_neighboors(W, nlayer, wsize, wlen, joint, train, train_size, Wtrain, ytrain, error)
    W, joint, Wtrain, Wval, error, flg2 = check_great_neighboors(W, nlayer, wsize, wlen, joint, train, train_size, Wtrain, ytrain, error)
    print(error)
    
    if flg1 ==0 & flg2 ==0 :
        keep = False
    else:
        save_window(joint, W)
        save_results(Wtrain, Wval)
        
    if qq>len(windows_continuos):
        keep = False
    
print('end of testing, fl2 = ', flg2)
end = time()
print('tempo de execução: {}'.format(end - start))            


start Validation error:  0.018176020408163265
----------
Teste de Janela  1

 check_lesser_neighboors Start
----------
Entrando no get error window
start error:  0.13271683673469387
end of testing, fl2 =  1
----------
Entrando no get error window
start error:  0.12164115646258505
end of testing, fl2 =  1
----------
Entrando no get error window
start error:  0.11312712585034013
end of testing, fl2 =  1
error menor
W_new:  [array([nan,  1., nan,  1.,  1., nan, nan,  1., nan]), array([nan,  1., nan,  1.,  1.,  1., nan,  1., nan])]
----------
Entrando no get error window
start error:  0.1110863095238095
end of testing, fl2 =  1
error menor
W_new:  [array([nan,  1., nan,  1.,  1.,  1., nan, nan, nan]), array([nan,  1., nan,  1.,  1.,  1., nan,  1., nan])]
----------
Entrando no get error window
start error:  0.08596938775510207
end of testing, fl2 =  1
----------
Entrando no get error window
start error:  0.0858843537414966
end of testing, fl2 =  1
----------
Entrando no get error window
st

In [29]:
save_results(Wtrain, Wval, Wtest)

In [34]:
save_results_complet(Wtrain, Wval, Wtest)

In [36]:
error

array([0.01743197, 0.01702806])

In [33]:
calculate_error(ytest, Wtest)

0.01728316326530612

In [35]:
W

[array([nan,  1., nan,  1.,  1.,  1., nan, nan, nan]),
 array([nan,  1., nan,  1.,  1.,  1.,  1.,  1.,  1.])]

In [41]:
len(w_hist['W'])

32