<a href="https://colab.research.google.com/github/Myeong2/ComputerVision_Project3-DrinkDetector/blob/master/Untitled17.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gym
import numpy as np
import cv2
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

from collections import deque
import random
import tensorflow.keras as keras
import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

class DQNAgent:
    def __init__(self, state_size, action_size, exploration_rate_initial=1.0, exploration_rate_min=0.01,
                 exploration_decay=0.995):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95

        # 초기 탐험비율, 최소 탐험비율, 탐험비율 감소 시 사용할 값 설정
        self.exploration_rate_initial = exploration_rate_initial
        self.exploration_rate_min = exploration_rate_min
        self.exploration_decay = exploration_decay

        # 탐험비율 초기화
        self.exploration_rate = self.exploration_rate_initial

        self.learning_rate = 0.001
        self.model = self.build_model()

    def build_model(self):
        model = keras.Sequential()
        model.add(keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=self.state_size))
        model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
        model.add(keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'))
        model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
        model.add(keras.layers.Flatten())
        model.add(keras.layers.Dense(64, activation='relu'))
        model.add(keras.layers.Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=keras.optimizers.Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, next_state, reward, done):
        self.memory.append((state, action, next_state, reward, done))

    def get_action(self, state):
        # 확률적으로 액션을 선택할지 여부 결정
        if np.random.rand() <= self.exploration_rate:
            return np.random.randint(self.action_size)
        # 모형을 기반으로 액션을 선택
        else:
            return np.argmax(self.model.predict(state))

    def experience_replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, next_state, reward, done in minibatch:
            target = reward
            if not done:
                # 타켓값 계산
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            # 학습이 되도록 타겟값 설정
            target_f = self.model.predict(state)
            target_f[0][action] = target
            # 모형 학습
            self.model.fit(state, target_f, epochs=1, verbose=0)

        # 탐험비율 감소
        if self.exploration_rate > self.exploration_rate_min:
            self.exploration_rate *= self.exploration_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)


class ImageTransformationEnv(gym.Env):
    def __init__(self, init_image=None, classifier=None):
        self.action_space = spaces.Discrete(17)
        self.observation_space = spaces.Box(low=0, high=255, shape=(64, 64, 3), dtype=np.uint8)

        if init_image is not None:
            self.init_image = init_image.copy()
            self.current_image = init_image.copy()

    def reset(self):
        self.current_image = self.init_image.copy()
        return self.current_image

    def _calculate_reward(self, selected_image_normalized):
        cat_prob = model.predict(np.expand_dims(selected_image_normalized, axis=0))[0][0]
        reward = cat_prob
        return reward

    def step(self, action):
        selected_image_normalized = self.current_image / 255.
        reward = self._calculate_reward(selected_image_normalized)

        self.current_image = self.apply_action(action)
        done = reward >= 0.7  # 타겟 리워드를 0.7로 설정

        return self.current_image, reward, done

    def render(self):
        if self.selected_img is not None:
            cv2.imshow("Environment Render", self.selected_img)
            cv2.waitKey(1)
        else:
            print("이미지가 없어 화면에 그릴 수 없습니다.")


    # 이미지 변환을 진행하는 코드를 apply_action 메서드로 추가하겠습니다.
    def apply_action(self, action):
        result_img = self.current_image.copy()

        if action == 0:
            print("상 이동")
            result_img = np.roll(result_img, 1, axis=0)
        elif action == 1:
            print("하 이동")
            result_img = np.roll(result_img, -1, axis=0)
        elif action == 2:
            print("좌 이동")
            result_img = np.roll(result_img, 1, axis=1)
        elif action == 3:
            print("우 이동")
            result_img = np.roll(result_img, -1, axis=1)
        elif action == 4:
            print("R 채널 증가")
            result_img[:, :, 0] = np.clip(result_img[:, :, 0] + 1, 0, 255)
        elif action == 5:
            print("G 채널 증가")
            result_img[:, :, 1] = np.clip(result_img[:, :, 1] + 1, 0, 255)
        elif action == 6:
            print("B 채널 증가")
            result_img[:, :, 2] = np.clip(result_img[:, :, 2] + 1, 0, 255)
        elif action == 7:
            print("R 채널 감소")
            result_img[:, :, 0] = np.clip(result_img[:, :, 0] - 1, 0, 255)
        elif action == 8:
            print("G 채널 감소")
            result_img[:, :, 1] = np.clip(result_img[:, :, 1] - 1, 0, 255)
        elif action == 9:
            print("B 채널 감소")
            result_img[:, :, 2] = np.clip(result_img[:, :, 2] - 1, 0, 255)
        elif action == 10:
            print("가우시안 블러 적용")
            result_img = cv2.GaussianBlur(result_img, (3, 3), 0)
        elif action == 11:
            print("R 채널 10 만큼 증가")
            result_img[:, :, 0] = np.clip(result_img[:, :, 0] + 10, 0, 255)
        elif action == 12:
            print("G 채널 10 만큼 증가")
            result_img[:, :, 1] = np.clip(result_img[:, :, 1] + 10, 0, 255)
        elif action == 13:
            print("B 채널 10 만큼 증가")
            result_img[:, :, 2] = np.clip(result_img[:, :, 2] + 10, 0, 255)
        elif action == 14:
            print("R 채널 10 만큼 감소")
            result_img[:, :, 0] = np.clip(result_img[:, :, 0] - 10, 0, 255)
        elif action == 15:
            print("G 채널 10 만큼 감소")
            result_img[:, :, 1] = np.clip(result_img[:, :, 1] - 10, 0, 255)
        elif action == 16:
            print("B 채널 10 만큼 감소")
            result_img[:, :, 2] = np.clip(result_img[:, :, 2] - 10, 0, 255)

        return result_img

batch_size = 32

# 이미지 변화 과정을 저장할 폴더 생성
if not os.path.exists('result_images'):
    os.makedirs('result_images')

env = ImageTransformationEnv(init_image=selected_img, classifier=model)
state_size = env.observation_space.shape
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)

episode = 0
max_reward = 0

while max_reward < 0.9:  # 타겟 리워드를 충족할 때까지 진행
    state = env.reset()
    state = np.reshape(state, [1, *state_size])

    done = False
    time = 0

    episode_images = []

    while not done:
        action = agent.get_action(state)
        next_state, reward, done = env.step(action)

        episode_images.append(next_state)  # 이미지 변화 과정 저장
        next_state = np.reshape(next_state, [1, *state_size])

        agent.remember(state, action, next_state, reward, done)
        state = next_state
        time += 1

        max_reward = max(max_reward, reward)

        # 탐험비율 변화
        agent.exploration_rate *= agent.exploration_decay
        agent.exploration_rate = max(agent.exploration_rate_min, agent.exploration_rate)

        if done or time >= 100:  # 프레임 수 제한
            break

        # 경험 재생
        if len(agent.memory) >= batch_size:
            agent.experience_replay(batch_size)
            if time % 1000 == 0:
                agent.save(f'checkpoints/episode_{episode}_time_{time}.h5')

    if episode % 10 == 0:
        print(f"에피소드: {episode}, 리워드: {max_reward}, 탐험비율: {agent.exploration_rate}")

    # 변화 과정 이미지 저장
    for idx, img in enumerate(episode_images):
        cv2.imwrite(f'result_images/episode_{episode}_step_{idx}.png', img)

    episode += 1
