In [1]:
import os
import cv2
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from numpy import array
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from copy import deepcopy
import utils

def reduce_var(x, axis=None, keepdims=False):
    """Variance of a tensor, alongside the specified axis.

    # Arguments
        x: A tensor or variable.
        axis: An integer, the axis to compute the variance.
        keepdims: A boolean, whether to keep the dimensions or not.
            If `keepdims` is `False`, the rank of the tensor is reduced
            by 1. If `keepdims` is `True`,
            the reduced dimension is retained with length 1.

    # Returns
        A tensor with the variance of elements of `x`.
    """
    m = tf.reduce_mean(x, axis=axis, keep_dims=True)
    devs_squared = tf.square(x - m)
    return tf.reduce_mean(devs_squared, axis=axis, keep_dims=keepdims)

def reduce_std(x, axis=None, keepdims=False):
    """Standard deviation of a tensor, alongside the specified axis.

    # Arguments
        x: A tensor or variable.
        axis: An integer, the axis to compute the standard deviation.
        keepdims: A boolean, whether to keep the dimensions or not.
            If `keepdims` is `False`, the rank of the tensor is reduced
            by 1. If `keepdims` is `True`,
            the reduced dimension is retained with length 1.

    # Returns
        A tensor with the standard deviation of elements of `x`.
    """
    return tf.sqrt(reduce_var(x, axis=axis, keepdims=keepdims))
 
TRAIN_DIR = 'C:/Jupyter_project/DL/trainingSet/'
x_train = []
img_list = os.listdir(TRAIN_DIR)
for img_n in img_list:
    img_path = os.path.join(TRAIN_DIR, img_n)
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    x_train.append([np.array(img)])
x_train = np.array(x_train).astype(np.float32)
x_train = x_train / 255.0
x_train = np.reshape(x_train, (-1,200,200,1))

TEST_DIR = 'C:/Jupyter_project/DL/testSet/'
x_test = []
img_list = os.listdir(TEST_DIR)
for img_n in img_list:
    img_path = os.path.join(TEST_DIR, img_n)
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    x_test.append([np.array(img)])
x_test = np.array(x_test).astype(np.float32)
x_test = x_test / 255.0
x_test = np.reshape(x_test, (-1,200,200,1))

#np.save("train_data.npy", x_train)
#np.save("test_input.npy",x_test)

# this line should be commented out for regular python run 
%matplotlib inline  
# this line should be commented out for regular python run 


""" Hyperparameter """
data_size_train = 1100
data_size_test = 230
batch_size = 1
lr = 1e-6
epoch = int(10)


""" Data Loading """

data = (x_train, x_test)
#x_train, x_test = data


""" Graph Construction """
tf.random.set_random_seed(325)

# placeholders
x = tf.placeholder(tf.float32, shape=(None, 200, 200, 1), name='x')

# weights and layers #################################################################

# convolution layer 1
# input and input.shape:   x,     (None, 200, 200, 1)
# output and output.shape: conv1, (None, 100, 100, 32)
conv1_W = tf.get_variable("conv1_W", \
                          shape=(3,3,1,32), \
                          initializer=tf.truncated_normal_initializer(stddev=0.1))
            #conv:마스크 W로 필터링  #가운데 1두개는 옆으로 한칸씩 움직인다는것
conv1 = tf.nn.conv2d(x, conv1_W, strides=(1,1,1,1), \
                     padding='SAME') # (None, 200, 200, 32)

#relu: 양수면 양수 그대로를, 음수면 0을 아웃풋                    
conv1 = tf.nn.relu(conv1) # (None, 200, 200, 32)
# max_pool: 2*2에서 각 구역의 최대점을 output
conv1 = tf.nn.max_pool(conv1, ksize=(1,2,2,1), strides=(1,2,2,1), \
                       padding='SAME') # (None, 100, 100, 32)

# convolution layer 2
# input and input.shape:   conv1, (None, 100, 100 32)
# output and output.shape: conv2, (None, 50, 50, 64)
conv2_W = tf.get_variable("conv2_W", \
                          shape=(3,3,32,64), \
                          initializer=tf.truncated_normal_initializer(stddev=0.1))
conv2 = tf.nn.conv2d(conv1, conv2_W, strides=(1,1,1,1), \
                     padding='SAME') # (None, 50, 50, 64)
conv2 = tf.nn.relu(conv2) # (None, 50, 50, 64)
conv2 = tf.nn.max_pool(conv2, ksize=(1,2,2,1), strides=(1,2,2,1), \
                       padding='SAME') # (None, 50, 50, 64)

# fully connected layer
# input and input.shape:   conv2, (None, 50, 50, 64)
# output and output.shape: fc,    (None, 256) 
flatten = tf.reshape(conv2, (-1, 160000)) # (None, 50*50*64=160000) 
fc_W = tf.get_variable("fc_W", \
                        shape=(160000,256), \
                        initializer=tf.truncated_normal_initializer(stddev=0.1))
fc = tf.matmul(flatten, fc_W) # (None, 256) 
fc = tf.nn.relu(fc) # (None, 256) 

# output layer
# input and input.shape:   fc,     (None, 256) 
# output and output.shape: logits, (None, 10) 
out_W = tf.get_variable("out_W", \
                        shape=(256, 1), \
                        initializer=tf.truncated_normal_initializer(stddev=0.1))
cnnout_x = fc @ out_W # (None, 1)

outx_mean = tf.reduce_mean(tf.cast(cnnout_x, tf.float32))
outx_std = reduce_std(tf.cast(cnnout_x, tf.float32))
cnnout_x = (cnnout_x-outx_mean) / outx_std

# weights and layers #################################################################
np.random.seed(0)
tf.set_random_seed(0)

df = pd.read_csv('stiffness_training.csv')
if 0:
    # not good
    pass
elif 1:
    # good
    df = (df-df.mean()) / df.std()
y_train = df.stiffness.values.astype(np.float32).reshape((-1,1)) # (894,1)

df2 = pd.read_csv('stiffness_test.csv')
if 0:
    # not good
    pass
elif 1:
    # good
    df2 = (df2-df2.mean()) / df2.std()
y_test = df2.stiffness.values.astype(np.float32).reshape((-1,1)) # (224,1)

y = tf.placeholder(tf.float32, shape=(None,1)) # (894,1)

alpha = tf.get_variable('alpha', (), dtype=tf.float32, initializer=tf.keras.initializers.constant(0.0))
beta = tf.get_variable('beta', (1,1), dtype=tf.float32, initializer=tf.keras.initializers.constant(0.0))

y_pred = alpha + cnnout_x @ beta # (None,1) = () + (None,1) @ (1,1)
loss = tf.nn.l2_loss(y-y_pred)
train = tf.train.GradientDescentOptimizer(learning_rate=lr).minimize(loss)

# test accuracy
error = tf.abs(y-y_pred)
error_ratio = tf.divide(error,y)
test_accuracy = tf.reduce_mean(tf.cast(error_ratio, tf.float32))

In [None]:
with tf.Session() as sess:
    
    tf.global_variables_initializer().run()
    
    alpha_trace = []
    beta_trace = []
    loss_trace = []
    
    feed_dict = {x:x_train, y:y_train}
    for i in range(epoch):
        if i == 0:
            alpha_run, beta_run, loss_run = sess.run([alpha, beta, loss], feed_dict=feed_dict)
        else:
            alpha_run, beta_run, loss_run, _ = sess.run([alpha, beta, loss, train], feed_dict=feed_dict)
        alpha_trace.append(alpha_run)
        beta_trace.append(beta_run[0,0])
        loss_trace.append(loss_run)
    y_pred_run = sess.run(y_pred, feed_dict=feed_dict)
            
    fig, (ax1, ax2, ax3) = plt.subplots(1, 3)
    ax1.plot(alpha_trace)
    ax1.set_xlabel('epoch')
    ax1.set_title('alpha')
    ax2.plot(beta_trace)
    ax2.set_xlabel('epoch')
    ax2.set_title('beta')
    ax3.plot(loss_trace)
    ax3.set_xlabel('epoch')
    ax3.set_title('loss')
    plt.tight_layout()
    plt.show()
    
    #fig, ax = plt.subplots(1, 1)
    #ax.plot(cox_temp.reshape((-1,)), y_train.reshape((-1,)), 'o')
    #ax.plot(cox_temp.reshape((-1,)), yp_temp.reshape((-1,)))
    #plt.show()
    
    # compute test accuracy
    test_accuracy_list = []
    feed_dict = {x: x_test, y: y_test}
    test_accuracy_temp = sess.run(test_accuracy, feed_dict=feed_dict)
    test_accuracy_list.append(test_accuracy_temp)
    
    print('Test Mean Error: ', np.mean(np.array(test_accuracy_list)))