# 로컬에서 학습시키는 스크립트

In [1]:
import cv2
import tensorflow as tf
import h5py
import numpy as np
import time

In [2]:
class CNN:
    def __init__(self, h, w, sess):
        self.size_h = h
        self.size_w = w
        self.sess = sess
        self.model = self.make_model()
    
    def make_model(self):
        # 모델
        self.observation = tf.placeholder(shape=[None, self.size_h, self.size_w, 1], dtype=tf.float32) # 이미지 데이터
        self.label = tf.placeholder(shape=[None, 3], dtype=tf.int16) # 라벨 데이터
        
        self.w_in = tf.Variable(tf.random_normal([5,5,1,8], stddev=.01)) # conv 1 가중치
        self.l1 = tf.nn.conv2d(self.observation, self.w_in, strides=[1,1,1,1], padding='SAME')
        self.l1 = tf.nn.relu(self.l1)
        self.l1 = tf.nn.max_pool(self.l1, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')
        
        self.w_out = tf.Variable(tf.random_normal(shape=[self.size_w//2*self.size_h//2*8, 3], stddev=.01))
        self.b = tf.Variable(tf.random_normal([3]))
        self.h_flat = tf.reshape(self.l1, [-1, self.size_w//2*self.size_h//2*8])
        
        self.output = tf.matmul(self.h_flat, self.w_out) + self.b
        self.cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=self.output, labels=self.label))
        self.optimizer = tf.train.AdamOptimizer(learning_rate=.001).minimize(self.cost)
        print('> 모델 생성 완료')
        
    def train(self, batch_in, batch_label):
        # 학습시행.
        _, cost = self.sess.run([self.optimizer, self.cost], feed_dict={self.observation:batch_in, self.label:batch_label})
        return cost
    
    def policy(self, input_data):
        # policy 만 도출하기
        output = self.sess.run(self.output, feed_dict={self.observation:input_data})
        return output
    
    def test(self, in_test, label_test):
        # 길을 잘 인식하는지 테스트.
        # 테스트 데이터 셋을 미리 저장해놓고 검증하는 메서드
        in_test_copied = copy.deepcopy(in_test)
        cnt = 0
        for step in range(len(in_test_copied)):
            key = cv2.waitKey(10) & 0xFF
            if(key == ord('q')):
                break
            out = self.sess.run(self.output, feed_dict={self.observation:in_test_copied[step:step+1]})
            idx = np.argmax(out, axis=1)
            one_hot = np.zeros_like(out)
            one_hot[0][idx[0]] = 1
            '''if(np.all(one_hot == label_test[])):
                cnt += 1'''
            cv2.putText(in_test_copied[step], 'command : {}'.format(one_hot[0]), (10, 30), cv2.FONT_HERSHEY_COMPLEX, .3, (0,0,255), 1)
            cv2.imshow('testing..', in_test_copied[step])
        
        cv2.destroyAllWindows()
        accuracy = cnt / len(in_test)
        print('acc :', accuracy)
    
    def test_live(self):
        # 실시간으로 길을 잘 인식하는지 테스트하는 메서드.
        out = self.sess.run(self.output, feed_dict={self.observation:d})


In [84]:
class Env:
    def __init__(self, database, model, sess):
        self.database = database # 외부에 학습데이터 저장해놓기. 데이터는 소중하니까
        self.model = model # CNN 모델
        self.sess = sess # session 객체
        print('> Env 생성')
    
    def reset(self, image, ridar=None):
        # 터틀봇으로부터 카메라 데이터 및 라이다 데이터를 수신하여 반환하는 메서드
        # 동시에 초기화
        # ** 통신필요 **
        # recv 함수들 모두 인자 수정해야함!!
        observation = image
        print('> Env started!')
        return observation
  
    def recv_data(self, image, ridar=None):
        # 터틀봇으로부터 카메라 데이터 및 라이다 데이터를 수신하여 반환하는 메서드
        # ** 통신필요 **
        observation = image
        return observation
    
    def get_dataset(self, observation, label):
        # 외부 데이터베이스에 현재상태에 대한 데이터를 제공하기위한 메서드
        return observation, label
    
    def step(self, processed):
        # 메인 루프.
        # 영상획득 - 전처리(외부로부터 진행) - 개입여부 확인 - 개입했다면 데이터 추가하고 개입한대로 action 수행
        #                                   - 아니라면 도출된 policy 결과로 action 수행
      
        self.send_data(action) # 터틀봇에 action 을 송신한다.
        
        image_original = self.recv_data(0)
        return image_original

In [3]:
with h5py.File('dataset.h5', 'r') as f: # binary 파일로 학습데이터 다루기
    batch_in = np.array(f['observation'])
    batch_in = np.expand_dims(batch_in, axis=3)
    #batch_in.dtype = np.uint8
    batch_label = np.array(f['label'])
    print('> 사전학습 데이터 load & setting 완료')
    print(batch_in.shape)
    #print(list(batch_in[10]), batch_label[10])


> 사전학습 데이터 load & setting 완료
(609, 160, 640, 1)


In [9]:
# for Debug
for dat in batch_in:
    cv2.imshow('s', dat)
    if(cv2.waitKey(0) == ord('q')):
        break

cv2.destroyAllWindows()

In [5]:
width = 160
height = 640
total_epoch = 10

with tf.Session() as sess:
    start = time.time()
    model = CNN(width, height, sess)
    sess.run(tf.global_variables_initializer())
    saver = tf.train.Saver()
    print('> 데이터 학습 중..')
    for epoch in range(total_epoch):
        start_epoch = time.time()
        cost = model.train(batch_in, batch_label)
        print('{}/{} epoch, 손실크기 : {}, {} 초 소요'.format(epoch+1, total_epoch, cost, time.time() - start_epoch))
        
    save_path = 'C:\\model'
    saver.save(sess, save_path) # 모델 저장
    took_time = time.time() - start
    print('> 학습완료, {} 초 소요, 모델의 파라미터가 {} 에 저장됨.'.format(took_time, save_path))

> 모델 생성 완료
> 데이터 학습 중..
1/10 epoch, 손실크기 : 17.606658935546875, 905.7519116401672 초 소요


KeyboardInterrupt: 