In [102]:
import numpy as np

In [103]:
def conv(X,W1,b,conv_param):
    out = None
    N,H,W = X.shape
    F,HH,WW = W1.shape
    pad = conv_param["pad"]
    stride = conv_param["stride"]
    X = np.pad(X, ( (0, 0), (pad, pad),(pad, pad)), 'constant')
    Hn = 1 + int((H + 2 * pad - HH) / stride)
    Wn = 1 + int((W + 2 * pad - WW) / stride)
    out = np.zeros((N,F,Hn,Wn))
    for n in range(N):
        for m in range(F):
            for i in range(Hn):
                for j in range(Wn):
                    data = X[n,i*stride:i*stride+HH,j*stride:j*stride+WW].reshape(1,-1)
                    filt = W1[m].reshape(-1,1)
                    out[n,m,i,j] = data.dot(filt)+b[m]
    return out

In [104]:
def conv_back(dloss,X,W1,b,conv_param):
    N, F, Hn, Wn = dloss.shape
    N, H, W = X.shape
    F, HH,WW= W1.shape
    pad = conv_param["pad"]
    stride = conv_param["stride"]
    dw = np.zeros_like(W1)
    X = np.pad(X,((0,0),(pad,pad),(pad,pad)),'constant')
    for n in range(N):
        for m in range(F):
            for i in range(HH):
                for j in range(WW):
                    dw[m] += X[n,i*stride:i*stride+HH,j*stride:j*stride+WW]*dloss[n,m,i,j]
    db = np.sum(dloss,axis = (0,2,3))
    return dw,db

In [105]:
def Pooling(feature_map,pool_param):
#     print(feature_map.shape)
    N,C,H,W = feature_map.shape
    HH,WW,stride = pool_param['pool_height'],pool_param['pool_width'],pool_param['pool_stride']
    Hn = 1 + int((H - HH) / stride)
    Wn = 1 + int((W - WW) / stride)
    out = np.zeros((N,C,Hn,Wn))
    for i in range(Hn):
        for j in range(Wn):
#             print((np.max(feature_map[...,i*stride:i*stride+HH,j*stride:j*stride+WW],axis=(2,3)).shape))
            out[...,i,j] = np.max(feature_map[...,i*stride:i*stride+HH,j*stride:j*stride+WW],axis=(2,3))
    return out

In [106]:
def Pooling_back(dloss,x,out,pool_param):
    #还原大小
    height,width,stride = pool_param['pool_height'],pool_param['pool_width'],pool_param['pool_stride']
    dx = np.zeros_like(x)
    N,C,H,W = dloss.shape
    for i in range(H):
        for j in range(W):
            mark = x[...,i*stride:i*stride+height,j*stride:j*stride+width]==out[...,i,j][...,np.newaxis,np.newaxis]
            dx[...,i*stride:i*stride+height,j*stride:j*stride+width] = mark*dloss[...,i,j][...,np.newaxis,np.newaxis]
    return dx

In [107]:
def ReLU(feature_map):
    return np.maximum(feature_map,0)

In [108]:
def sigmoid(z):
    return 1/(1+np.exp(-z))

In [121]:
class ThreeLayerCNN(object):
    def __init__(self,input_dim=(32,32),num_filters=32,filter_size=3,hidden_size=100,
                 num_classes=2,reg=0.01,pool_param={'pool_height':2,'pool_stride':2,'pool_width':2},conv_param={}):
        self.reg = reg
        self.params = {}
        self.pool_param = pool_param
        if conv_param:
            self.conv_param = conv_param
        else:
            self.conv_param = {'stride':1,'pad':(filter_size-1)//2}
        
        H,W = input_dim
        Hn = (H-filter_size+2*self.conv_param['pad'])//self.conv_param['stride']+1
        Wn = (H-filter_size+2*self.conv_param['pad'])//self.conv_param['stride']+1
        Hf = (Hn-self.pool_param["pool_height"])//self.pool_param["pool_stride"]+1
        Wf = (Wn-self.pool_param["pool_width"])//self.pool_param["pool_stride"]+1
        
        self.params['W1'] = 0.001*np.random.randn(num_filters,filter_size,filter_size)
        self.params['b1'] = np.zeros(num_filters)
        self.params['W2'] = 0.001*np.random.randn(num_filters*Hf*Wf,hidden_size)
        self.params['b2'] = np.zeros(hidden_size)
        self.params['W3'] = 0.001*np.random.randn(hidden_size,num_classes)
        self.params['b3'] = np.zeros(num_classes)
    def loss(self,X,y=None):
        loss = 0
        grads = {}
        
        N = X.shape[0]
        reg = self.reg
        W1, b1 = self.params['W1'], self.params['b1']
        W2, b2 = self.params['W2'], self.params['b2']
        W3, b3 = self.params['W3'], self.params['b3']
        H1_conv = conv(X,W1,b1,self.conv_param)
        H1_tmp = ReLU(H1_conv)
        H1 = Pooling(H1_tmp,self.pool_param)
        N_,NUM,H,W = H1.shape
        flatten = H1.reshape(H1.shape[0],-1)
        H2 = ReLU(flatten.dot(W2)+b2)
        scores = H2.dot(W3)+b3
        h_x = sigmoid(scores)
        tmp1 = -np.log(h_x).T*y
        tmp1 = tmp1.T
        tmp2 = (1-y)*np.log(1-h_x).T
        tmp2 = tmp2.T
#         print(tmp1.shape,tmp2.shape)
        tmp3 = tmp1-tmp2
        loss = np.sum(tmp1-tmp2)/N
        loss += 0.5*self.reg*(np.sum(W3**2)+np.sum(W2**2)+np.sum(W3**2))
        
        y_ = y.reshape(-1,1)
        dW3 = (np.dot(H2.T,(h_x-y_)))/N + 0.5*reg*W3
        tmp = np.mat(H2.T+0.0001*np.random.rand(H2.shape[0],H2.shape[1]))
        tmp = tmp.I
        tmp = np.array(tmp)
        dscores = tmp.dot(dW3)
        db3 = np.sum(dscores,axis = 0)/N

        dW2_tmp = dscores.dot(dW3.T)
        dW2_tmp[H2==0]=0
        dW2 = flatten.T.dot(dW2_tmp) + 0.5*reg*W2
        db2 = np.sum(dW2_tmp,axis = 0)/N
        
        dflatten = dW2_tmp.dot(W2.T)
        dH1 = dflatten.reshape(N_,NUM,H,W)
        dH1_conv = Pooling_back(dH1,H1_tmp,H1,self.pool_param)
        dre = dH1_conv
        dH1_conv[H1_tmp==0] = 0
        dW1,db1 = conv_back(dH1_conv,X,W1,b1,self.conv_param)
        dW1 += 0.5*reg*W1
        
        grads['W1'],grads['W2'],grads['W3'] = dW1,dW2,dW3
        grads['b1'],grads['b2'],grads['b3'] = db1,db2,db3
        
        return loss, grads
    
    def train(self,X,y,learning_rate=1e-3,batch_size=100,num_iter=10,learning_rate_decay=0.95):
        num_train = X.shape[0]
        loss_history = []
        train_acc_history = []
        iterations_per_epoch = max(1,int(num_train/batch_size))
        
        for it in range(num_iter):
            indices = np.random.choice(num_train,batch_size,replace=True)
            x_batch = X[indices]
            y_batch = y[indices]
            
            loss, grads = self.loss(x_batch, y_batch)
            loss_history.append(loss)
            self.params['W1'] -= learning_rate*grads['W1']
            self.params['W2'] -= learning_rate*grads['W2']
            self.params['W3'] -= learning_rate*grads['W3']
            self.params['b1'] -= learning_rate*grads['b1']
            self.params['b2'] -= learning_rate*grads['b2']
            self.params['b3'] -= learning_rate*grads['b3']
            
            if it % iterations_per_epoch == 0:
                train_acc = (self.predict(x_batch) == y_batch).mean()
                train_acc_history.append(train_acc)
                learning_rate *= learning_rate_decay

        return {'loss_history': loss_history,'train_acc_history': train_acc_history }
                                   
    def predict(self,X):
        W1, b1 = self.params['W1'], self.params['b1']
        W2, b2 = self.params['W2'], self.params['b2']
        W3, b3 = self.params['W3'], self.params['b3']
        tmp = Pooling(ReLU(conv(X,W1,b1,self.conv_param)),self.pool_param)
        N,F,H,W = tmp.shape
        flatten = tmp.reshape(tmp.shape[0],-1)
        H2 = ReLU(flatten.dot(W2)+b2)
        scores = H2.dot(W3)+b3
        pred = np.argmax(scores,axis = 1)
        return pred

In [110]:
import pandas as pd
import cv2

In [111]:
X_file = pd.read_csv("MURA-v1.1/train_image_paths.csv",header = None)
X_valid_file = pd.read_csv("MURA-v1.1/valid_image_paths.csv",header = None)
y_file = pd.read_csv("MURA-v1.1/train_labeled_studies.csv",header = None,index_col=0)
y_valid_file = pd.read_csv("MURA-v1.1/valid_labeled_studies.csv",header = None,index_col=0)

In [124]:
train_size = 1000
test_size = 300
img_size = (32,32)
X = []
y = []
test_X = []
test_y = []
for i in range(train_size):
    file = X_file[0].iloc[i]
    img = cv2.imread(file,0)
    img = cv2.resize(img,img_size)
    file = file[:-10]
    try:
        label = y_file.loc[file,1]
    except :
        pass
    else:
        X.append(img)
        y.append(label)
for i in range(test_size):
    file = X_valid_file[0].iloc[i]
    img = cv2.imread(file,0)
    img = cv2.resize(img,img_size)
    file = file[:-10]
    try:
        label = y_valid_file.loc[file,1]
    except :
        pass
    else:
        test_X.append(img)
        test_y.append(label)

In [125]:
X,y = np.array(X),np.array(y)
test_X,test_y = np.array(test_X),np.array(test_y)
print(X.shape,y.shape,test_X.shape,test_y.shape)

(1000, 32, 32) (1000,) (300, 32, 32) (300,)


In [122]:
net = ThreeLayerCNN()
net.train(X,y)

{'loss_history': [1.390453802507708,
  1.390453494952003,
  1.3904507870868164,
  1.390457979932719,
  1.3904673707811053,
  1.3904657881090239,
  1.3904461973176292,
  1.3904453387982116,
  1.3904312850185916,
  1.3904039933015155],
 'train_acc_history': [0.07]}

In [126]:
pred = net.predict(test_X)


<built-in method mean of numpy.ndarray object at 0x0000018E24CF8B20>


In [127]:
print((pred==test_y).mean())

0.11
