In [None]:
import numpy as np
import cv2
import math
import numpy as np
import matplotlib.pyplot as plt
import os
import h5py
import tensorflow as tf
import imgaug as ia
import imgaug.augmenters as iaa

In [None]:
from layers import dense_senet

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
cpus = tf.config.experimental.list_physical_devices(device_type='CPU')

# tf.config.experimental.set_visible_devices(devices=cpus[0], device_type='CPU')
# tf.config.experimental.set_visible_devices(devices=gpus[0], device_type='GPU')
# for gpu in gpus:
#     tf.config.experimental.set_memory_growth(device=gpu, enable=True)
    
tf.config.experimental.set_virtual_device_configuration(
    gpus[0],
    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=6144)])

In [None]:
IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL = 160, 352, 1
DATA_DIR = 'data/Shandong University/'
CLASSES = 106 * 2 * 3
IMG_PER_CLASS = 6
DATA_LENGTH = CLASSES * IMG_PER_CLASS
CACHE_FILE = 'data/cache.h5'
TRAIN_RATIO = 0.8
MODEL_FILE = 'data/model.h5'

In [None]:
# Prewitt算子/边缘检测
def edge_detectio(grayImage):
    kernelx = np.array([[1, 1, 1], [0, 0, 0], [-1, -1, -1]], dtype=int)
    x = cv2.filter2D(grayImage, cv2.CV_16S, kernelx)
    laplacian = cv2.Laplacian(grayImage,cv2.CV_64F)
    absX= cv2.convertScaleAbs(laplacian)
    absX= cv2.GaussianBlur(absX,(3,3),100)
    h, w=absX.shape
    return absX, h, w

In [None]:
#取边缘检测后边界的点
def get_bound_piont(absX):
    h, w=absX.shape
    ret,absX=cv2.threshold(absX, 0, 255, cv2.THRESH_OTSU)
    t, t1 =0, h-1
    bound_up_x, bound_down_x, bound_up_y, bound_down_y = [], [], [], []
    for i in range (w):
        y_min, y_max = 0, h-1
        for j in range (h):
            if absX[j, i] > ret:
                if j < h // 2 and j > y_min:
                    y_min = j
                if j > h // 2 and j < y_max:
                    y_max = j
                    break
        if t < y_min:
            t=y_min
        if t1 > y_max:
            t1=y_max
        if y_max != h-1:
            bound_up_x.append(i)
            bound_up_y.append(y_max)
        if y_min != 0:
            bound_down_x.append(i)
            bound_down_y.append(y_min)
    t2=(t+t1)/2.
    bound_up_x = np.array(bound_up_x)
    bound_down_x = np.array(bound_down_x)
    bound_up_y = np.array(bound_up_y)
    bound_down_y = np.array(bound_down_y)
    return bound_up_x, bound_down_x, bound_up_y, bound_down_y

In [None]:
def fun2ploy(x,n):
    '''
    数据转化为[x^0,x^1,x^2,...x^n]
    首列变1
    '''
    lens = len(x)
    X = np.ones([1,lens])
    for i in range(1,n):
        X = np.vstack((X,np.power(x,i)))#按行堆叠
    return X

In [None]:
def leastseq_byploy(x,y,ploy_dim):
    '''
    最小二乘求解
    '''
    #散点图
    # plt.scatter(x,y,color="r",marker='o',s = 50)

    X = fun2ploy(x,ploy_dim)
    #直接求解
    Xt = X.transpose()#转置变成列向量
    XXt=X.dot(Xt)#矩阵乘
    XXtInv = np.linalg.inv(XXt)#求逆
    XXtInvX = XXtInv.dot(X)
    coef = XXtInvX.dot(y.T)

    y_est = Xt.dot(coef)

    return y_est,coef

In [None]:
def fitting_line(absX):
    ploy_dim =2#拟合参数个数，即权重数量
    ## 数据准备
    bound_up_x, bound_down_x, bound_up_y, bound_down_y = get_bound_piont(absX)
    x = bound_up_x
    y = bound_up_y
    x1 = bound_down_x
    y1 = bound_down_y
    # # 最小二乘拟合
    [y_est,coef] = leastseq_byploy(x,y,ploy_dim)
    [y_est1,coef1] = leastseq_byploy(x1,y1,ploy_dim)
    # print(coef,coef1)
    #显示拟合结果
    # est_data = plt.plot(x,y_est-5,color="g",linewidth= 2)
    # est_data = plt.plot(x1,y_est1+5,color="g",linewidth= 2)
    b1,k1 = coef
    b2,k2 = coef1
    k = (k1+k2)/2.0
    b = (b1+b2)/2.0
    return k, b

In [None]:
def get_theta(w,k,b):
    w_c = w/2
    h_c = k * w_c + b
    theta =math.atan(k)
    # print(theta)
    return theta,w_c,h_c

In [None]:
def Rotating_picture(img,theta,h_c,w_c):
    h, w,_=img.shape
    matRotate = cv2.getRotationMatrix2D((h_c, w_c), 180*theta/math.pi, 1)
    img1=cv2.warpAffine(img, matRotate, (w, h))
    return img1

In [None]:
def get_point(w_c,h_c,x,y,angle):
    (cX, cY) = (w_c, h_c)
    if (cX-x)== 0:
        theta1=math.atan((cY-y)/1)
        l=1/math.cos(theta1)
    else:
        theta1=math.atan((cY-y)/(cX-x))
        l=(cX-x)/math.cos(theta1)
    # print(h,w)
    new_x = cX-math.cos(angle +theta1)*l
    new_y = cY-math.sin(angle +theta1)*l
    # print(l, math.cos(theta1),cX,cY)
    # print((new_x-cX)**2+(new_y-cY)**2,(x-cX)**2+(y-cY)**2)
    return round(new_x), round(new_y)

In [None]:
def Vertical_boundary(w_c,h_c,bound_down_x, bound_down_y,bound_up_x, bound_up_y,theta):
    locs = list(zip(bound_down_x, bound_down_y))
    locs1 = list(zip(bound_up_x, bound_up_y))
    new_locs = list(map(lambda loc: get_point(w_c,h_c,loc[0], loc[1],   theta), locs))
    new_locs1 = list(map(lambda loc: get_point(w_c,h_c,loc[0], loc[1],   theta), locs1))
    new_locs = list(zip(*new_locs))
    new_locs1 = list(zip(*new_locs1))
    Y_max=int(max(new_locs[1]))
    Y_min=int(min(new_locs1[1]))
    return(Y_max, Y_min)

In [None]:
def ncrease_contrast(img):
    img = cv2.convertScaleAbs(img,alpha=1.5,beta=0)
    return img

In [None]:
def Horizontal_boundary(cropped):
    max_1, max_2=0, 0
    height, width,_ = cropped.shape
    kernel_width = width // 20
    x = [i for i in range(width)]
    light = [0 for i in range(width)]
    for w in range(width - kernel_width): #步长是1
        light[w+kernel_width//2] = np.sum(cropped[:, w: w+kernel_width])
        if w+kernel_width//2>width/3:
            if light[w+kernel_width//2]>light[w+kernel_width//2+1] and light[w+kernel_width//2]>light[w+kernel_width//2-1]:
               if max_1< light[w+kernel_width//2]:
                    max_1=light[w+kernel_width//2]
                    t800=w+kernel_width//2
    cut_1, cut_2=int(t800/3), int(2/3*(width-t800)+t800)
    return cut_1,cut_2

In [None]:
def gray_normalization(img):
    """ 灰度归一化 """
    g1, g2 = np.min(img), np.max(img)
    img = ((img - g1) / (g2 - g1) * 255).astype(np.uint8)
    return img


In [None]:
def img_preprocess(img_path):
    img = cv2.imread(img_path)
    grayImage = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    grayImage = cv2.GaussianBlur(grayImage,(3,3),0)
    absX, h, w=edge_detectio(grayImage)
    bound_up_x, bound_down_x, bound_up_y,       bound_down_y=get_bound_piont(absX)
    k, b=fitting_line(absX)
    theta,w_c,h_c=get_theta(w,k,b)
    img=Rotating_picture(img,theta,h_c,w_c)
    Y_max, Y_min=Vertical_boundary(w_c,h_c,bound_down_x,bound_down_y,bound_up_x,bound_up_y,-theta)
    cropped = img[Y_max:Y_min, :]
    X_min,X_max=Horizontal_boundary(cropped)
    ncrease_contrast(img)
    cropped = img[Y_max:Y_min, X_min:X_max]
    # plt.imshow(cropped) # 裁剪坐标为[y0:y1, x0:x1]
    cropped = cv2.resize(cropped, (IMG_WIDTH, IMG_HEIGHT))
    # 对图片进行灰度归一化
    cropped = gray_normalization(cropped)
    cropped = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
    return cropped

In [None]:
img_path = 'data/Shandong University/001/left/index_1.bmp'
img = img_preprocess(img_path)
plt.imshow(img, cmap='gray')
plt.show()

In [None]:
print(img.shape)
print(img.dtype)

In [None]:
def get_data():
    data = np.zeros(shape=(DATA_LENGTH, IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL), dtype=np.uint8)

    for human_id in os.listdir(DATA_DIR):
            human_dir = os.path.join(DATA_DIR, human_id)
            for hand_id in os.listdir(human_dir):
                hand_dir = os.path.join(human_dir, hand_id)
                for finger_name in os.listdir(hand_dir):
                    img_path = os.path.join(hand_dir, finger_name)
                    img = img_preprocess(img_path)
                    finger_type, finger_id = os.path.splitext(finger_name)[0].split('_')
                    idx = (int(human_id) - 1) * 36 + \
                            {'left': 0, 'right': 1}[hand_id] * 18 + \
                            {'index': 0, 'middle': 1, 'ring': 2}[finger_type] * 6 + \
                            int(finger_id) - 1
                    data[idx] = np.expand_dims(img, axis=-1)
                    print('get data {}/{} '.format(idx+1, CLASSES*IMG_PER_CLASS), end='\r')


In [None]:
if not os.path.exists(CACHE_FILE):
    print("未发现处理好的数据文件，正在处理...")

    data = get_data()

    h5f = h5py.File(CACHE_FILE, 'w')
    h5f["X"] = data
    h5f.close()
else:
    h5f = h5py.File(CACHE_FILE, 'r')
    data = h5f["X"][:]
    h5f.close()
    print("发现处理好的数据文件，正在读取...")

In [None]:
data.dtype, data.shape

In [None]:
data = data.reshape((CLASSES, IMG_PER_CLASS, IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL))
permutation = np.random.permutation(CLASSES)
train_data = data[permutation[: int(TRAIN_RATIO * CLASSES)]]
test_data = data[permutation[int(TRAIN_RATIO * CLASSES): ]]
train_size, test_size = train_data.shape[0], test_data.shape[0]
print(train_data.shape, test_data.shape)

In [None]:
plt.imshow(np.squeeze(data[1][1]), cmap='gray')
plt.show()

In [None]:
sometimes = lambda aug: iaa.Sometimes(0.5, aug)

aug = iaa.Sequential(
    [
        sometimes(iaa.CropAndPad(
            percent=(-0.05, 0.05),
            pad_mode=ia.ALL,
            pad_cval=(0, 255)
        )),
        sometimes(iaa.Affine(
            scale={"x": (0.95, 1.05), "y": (0.95, 1.05)}, # scale images to 80-120% of their size, individually per axis
            translate_percent={"x": (-0.05, 0.05), "y": (-0.05, 0.05)}, # translate by -20 to +20 percent (per axis)
            rotate=(-3, 3), # rotate by -45 to +45 degrees
            shear=(-3, 3), # shear by -16 to +16 degrees
            order=[0, 1], # use nearest neighbour or bilinear interpolation (fast)
            cval=(0, 255), # if mode is constant, use a cval between 0 and 255
            mode=ia.ALL # use any of scikit-image's warping modes (see 2nd image from the top for examples)
        )),
        sometimes(iaa.OneOf([
                    iaa.GaussianBlur((0, 3.0)), # blur images with a sigma between 0 and 3.0
                    iaa.AverageBlur(k=(2, 7)), # blur image using local means with kernel sizes between 2 and 7
                    iaa.MedianBlur(k=(3, 11)), # blur image using local medians with kernel sizes between 2 and 7
        ])),
        sometimes(iaa.Add((-10, 10), per_channel=0)),
        # execute 0 to 5 of the following (less important) augmenters per image
        # don't execute all of them, as that would often be way too strong
#         iaa.SomeOf((0, 1),
#             [
#                 iaa.OneOf([
#                     iaa.GaussianBlur((0, 3.0)), # blur images with a sigma between 0 and 3.0
#                     iaa.AverageBlur(k=(2, 7)), # blur image using local means with kernel sizes between 2 and 7
#                     iaa.MedianBlur(k=(3, 11)), # blur image using local medians with kernel sizes between 2 and 7
#                 ]),
# #                 iaa.Sharpen(alpha=(0, 1.0), lightness=(0.75, 1.5)), # sharpen images
# #                 iaa.AdditiveGaussianNoise(loc=0, scale=(0.0, 0.02*255), per_channel=0), # add gaussian noise to images
#                 iaa.Add((-10, 10), per_channel=0), # change brightness of images (by -10 to 10 of original value)
# #                 iaa.AddToHueAndSaturation((-20, 20)), # change hue and saturation
#             ],
#             random_order=True
#         )
    ],
    random_order=True
)

def amplify(X1, X2):
    data_length = X1.shape[0]
    flip_methods = [np.flipud, np.fliplr, np.flip, lambda _: _]
    method_indices = np.random.choice(len(flip_methods), data_length)
    for idx in range(data_length):
        flip_method = flip_methods[method_indices[idx]]
        X1[idx] = flip_method(X1[idx])
        X2[idx] = flip_method(X2[idx])
    return X1, X2
    

In [None]:
# base_model = tf.keras.applications.DenseNet121(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), weights='imagenet', include_top=False)
base_model = dense_senet.DenseNet121()
# base_model.trainable = False
input1 = tf.keras.Input(shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL))
input2 = tf.keras.Input(shape=(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL))

x1, x2 = input1, input2

# 1
# x1 = base_model(x1)
# x2 = base_model(x2)
# x = tf.keras.layers.Concatenate(axis=-1)([x1, x2])
# x = tf.keras.layers.GlobalAveragePooling2D()(x)
# x = tf.keras.layers.Dense(512, activation='relu')(x)
# # x = tf.keras.layers.Dense(512, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(l=0.01))(x)
# # x = tf.keras.layers.Dropout(0.3)(x)
# # x = tf.keras.layers.Dense(128, activation='relu')(x)
# x = tf.keras.layers.Dense(1, activation='sigmoid')(x)

# 2
x1 = base_model(x1)
x2 = base_model(x2)
x = tf.keras.layers.Subtract()([x1, x2])
x = x**2
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Dense(1, activation='sigmoid')(x)

# 3
# x1 = base_model(x1)
# x2 = base_model(x2)
# x1 = tf.keras.layers.GlobalAveragePooling2D()(x1)
# x2 = tf.keras.layers.GlobalAveragePooling2D()(x2)
# # x1 = tf.keras.backend.l2_normalize(x1, axis=-1)
# # x2 = tf.keras.backend.l2_normalize(x2, axis=-1)
# x = tf.keras.layers.Subtract()([x1, x2])
# x = x**2
# x = tf.keras.layers.Dense(128, activation='relu')(x)
# x = tf.keras.layers.Dense(1, activation='sigmoid')(x)

outputs = x
model = tf.keras.Model(inputs=[input1, input2], outputs=outputs)
model.summary()

In [None]:
num_epoch = 1000
batch_size = 16
learning_rate = 1e-5

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate)
loss_object = tf.keras.losses.BinaryCrossentropy()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.BinaryAccuracy(name='train_accuracy', dtype=None, threshold=0.5)

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.BinaryAccuracy(name='test_accuracy', dtype=None, threshold=0.5)

In [None]:
def batch_loader(data, batch_size=64):
    data_length, classes, _, _, _ = data.shape
    X1 = np.zeros(shape=(batch_size, IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL), dtype=np.float32)
    X2 = np.zeros(shape=(batch_size, IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL), dtype=np.float32)
    y = np.random.randint(2, size=(batch_size, )).astype(np.float32)
    for idx, is_same in enumerate(y):
        if is_same == 1.:
            class_id = np.random.randint(data_length)
            img_id1, img_id2 = np.random.choice(classes, 2, replace=False)
            X1[idx] = data[class_id][img_id1]
            X2[idx] = data[class_id][img_id2]
        else:
            class_id1, class_id2 = np.random.choice(data_length, 2, replace=False)
            img_id1, img_id2 = np.random.randint(classes), np.random.randint(classes)
            X1[idx] = data[class_id1][img_id1]
            X2[idx] = data[class_id2][img_id2]
    return X1, X2, y

In [None]:
X1, X2, y = batch_loader(train_data)

In [None]:
# aug = iaa.AddToHueAndSaturation((-20, 20))

X1, X2 = amplify(X1, X2)

idx = 15
plt.imshow(X1[idx].reshape(IMG_HEIGHT, IMG_WIDTH).astype(np.uint8), cmap='gray')
plt.show()
plt.imshow(X2[idx].reshape(IMG_HEIGHT, IMG_WIDTH).astype(np.uint8), cmap='gray')
plt.show()
print(y[idx])
print(X1[idx].astype(np.uint8).shape)

plt.imshow(aug(images=[X1[idx].astype(np.uint8)])[0].reshape(IMG_HEIGHT, IMG_WIDTH), cmap='gray')
plt.show()
plt.imshow(aug(images=[X2[idx].astype(np.uint8)])[0].reshape(IMG_HEIGHT, IMG_WIDTH), cmap='gray')
plt.show()

In [None]:
@tf.function
def train_on_batch(X1, X2, y):
    with tf.GradientTape() as tape:
        y_pred = model([X1, X2])
        loss = loss_object(y_true=y, y_pred=y_pred)
        loss = tf.reduce_mean(loss)
    grads = tape.gradient(loss, model.variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
    
    train_loss(loss)
    train_accuracy(y, y_pred)
    return loss

@tf.function
def test_on_batch(X1, X2, y):
    y_pred = model([X1, X2])
    t_loss = loss_object(y, y_pred)
    
    test_loss(t_loss)
    test_accuracy(y, y_pred)
    return t_loss

for epoch in range(num_epoch):
    
    train_loss.reset_states()
    train_accuracy.reset_states()
    test_loss.reset_states()
    test_accuracy.reset_states()
    
    for batch_index in range(train_size // batch_size):
        X1, X2, y = batch_loader(train_data, batch_size=batch_size)
        X1, X2 = amplify(X1, X2)
        X1 = np.array(aug(images=X1.astype(np.uint8)), dtype=np.float32)
        X2 = np.array(aug(images=X2.astype(np.uint8)), dtype=np.float32)
        X1 /= 255.
        X2 /= 255.
        loss = train_on_batch(X1, X2, y)
        template = '[Training] Epoch {}, Batch {}/{}, Loss: {}, Accuracy: {:.2%} '
        print(template.format(epoch+1,
                              batch_index,
                              train_size // batch_size,
                              loss,
                              train_accuracy.result()),
             end='\r')

    for batch_index in range(5*test_size // batch_size):
        X1, X2, y = batch_loader(test_data, batch_size=batch_size)
        X1 /= 255.
        X2 /= 255.
        loss = test_on_batch(X1, X2, y)
        template = '[Testing] Epoch {}, Batch {}/{}, Loss: {}, Accuracy: {:.2%} '
        print(template.format(epoch+1,
                              batch_index,
                              test_size // batch_size,
                              loss,
                              test_accuracy.result()),
             end='\r')
        
    template = 'Epoch {}, Loss: {}, Accuracy: {:.2%}, Test Loss: {}, Test Accuracy: {:.2%} '
    print(template.format(epoch+1,
                         train_loss.result(),
                         train_accuracy.result(),
                         test_loss.result(),
                         test_accuracy.result()))
    
    model.save_weights(MODEL_FILE)