<a href="https://colab.research.google.com/github/fix27/Colab-Store/blob/main/Bubble_Breaker_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import random
import tensorflow as tf
board_width=8
board_height=8
num_colors=4
num_actions=64


# Определение архитектуры нейронной сети DQN
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(board_width, board_height, 1)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(num_actions)  # Выходной слой для предсказания Q-значений для каждого действия
])

# Определение оптимизатора
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Определение функции потерь
def compute_loss(target_q, predicted_q):
    return tf.reduce_mean(tf.square(target_q - predicted_q))

# Определение опыта и буфера воспроизведения
class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.buffer = []

    def add(self, experience):
        if len(self.buffer) >= self.buffer_size:
            self.buffer.pop(0)
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

# Определение обучающего цикла
def train_step(states, actions, rewards, next_states, done):
    with tf.GradientTape() as tape:
        q_values = model(states, training=True)
        selected_action_q_values = tf.reduce_sum(tf.one_hot(actions, num_actions) * q_values, axis=1)
        next_q_values = model(next_states, training=True)
        max_next_q_values = tf.reduce_max(next_q_values, axis=1)
        target_q = rewards + (1 - done) * discount_factor * max_next_q_values
        loss = compute_loss(target_q, selected_action_q_values)

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

# Определение параметров обучения
discount_factor = 0.99
exploration_max = 1.0
exploration_min = 0.01
exploration_decay = 0.995
batch_size = 64
buffer_size = 2000  # Добавлено определение buffer_size
num_episodes = 100
max_steps_per_episode = 64

def has_adjacent_matches(game_board_):
    """
    Проверяет, есть ли на игровом поле два соседних пузыря одинакового цвета.

    :param game_board: Текущее состояние игры (матрица)
    :return: True, если есть два соседних пузыря одинакового цвета, в противном случае False
    """
    for x in range(board_width):
        for y in range(board_height):
          if game_board_[x, y]>0:
            if x + 1 < board_width and game_board_[x, y] == game_board_[x + 1, y]:
                return True
            if y + 1 < board_height and game_board_[x, y] == game_board_[x, y + 1]:
                return True
    return False



# Создание функции для выполнения действия в игре и получения состояния и награды
def take_action(game_board, action_):
    """
    Выполняет действие в игре Bubble Breaker, удаляя пузырь и обновляя состояние игрового поля.

    :param game_board: Текущее состояние игры (матрица)
    :param x: Координата x выбранной ячейки
    :param y: Координата y выбранной ячейки
    :return: Новое состояние игры и награду (если есть)
    """
    x = action_ % 8
    y = action_ // 8
    #num_colors = np.max(game_board)  # Максимальное количество цветов

    if game_board[x, y] == 0:
        # Пустая ячейка, ничего не происходит
        return game_board, -1, False

    # Определить цвет пузыря, который нужно удалить
    target_color = game_board[x, y]

    # Создать маску для поиска связанных пузырей того же цвета
    visited = np.zeros_like(game_board, dtype=np.uint8)
    reward = 0
    def flood_fill(x, y):
        if x < 0 or x >= board_width or y < 0 or y >= board_height:
            return
        if visited[x, y] == 1 or game_board[x, y] != target_color:
            return
        visited[x, y] = 1
        game_board[x, y] = 0  # Удалить пузырь
        flood_fill(x - 1, y)
        flood_fill(x + 1, y)
        flood_fill(x, y - 1)
        flood_fill(x, y + 1)

    flood_fill(x, y)
    reward = np.sum( visited)
    if reward == 1 :
        game_board[x, y]=target_color
        return game_board, -1, False

    # Сжать игровое поле (опустить пузыри, если есть пустые места)
    for col in range(board_width):
        non_zero_elements = game_board[col][game_board[col] != 0]
        game_board[col] = np.concatenate((non_zero_elements, np.zeros(board_height - len(non_zero_elements))))

    # Определить награду (например, за количество удаленных пузырей)
    #reward = (num_colors - np.max(game_board))  # Награда может быть связана с количеством цветов на игровом поле
    done = not has_adjacent_matches(game_board)

    return game_board, reward, done



# Инициализация буфера воспроизведения
replay_buffer = ReplayBuffer(buffer_size)

def initialize_game():
    # Создаем пустое игровое поле (матрицу)
    game_board = np.zeros((board_width, board_height), dtype=int)

    # Заполняем игровое поле случайными цветами
    for x in range(board_width):
        for y in range(board_height):
            game_board[x, y] = random.randint(1, num_colors)  # Генерируем случайный цвет

    return game_board

# Пример использования
initial_state = initialize_game()
# Пример использования

game_board = initialize_game()

# Выполнение действия в игре
#new_game_board, reward = take_action(game_board, 2, 2)

# Основной обучающий цикл
for episode in range(num_episodes):
    state = initial_state
    episode_reward = 0

    for step in range(max_steps_per_episode):
        if random.uniform(0, 1) < exploration_max:
            action = random.choice(range(num_actions))
        else:
            q_values = model.predict(np.expand_dims(state, axis=0))
            action = np.argmax(q_values)

        next_state, reward, done = take_action(state, action)
        episode_reward += reward

        replay_buffer.add((state, action, reward, next_state, done))

        if len(replay_buffer.buffer) >= batch_size:
            experiences = replay_buffer.sample(batch_size)
            states, actions, rewards, next_states, dones = zip(*experiences)
            states = np.array(states)
            actions = np.array(actions)
            rewards = np.array(rewards)
            next_states = np.array(next_states)
            dones = np.array(dones)

            train_step(states, actions, rewards, next_states, dones)

        if done:
            break

        state = next_state

    if exploration_max > exploration_min:
        exploration_max *= exploration_decay





[1;30;43mВыходные данные были обрезаны до нескольких последних строк (5000).[0m


In [3]:
def print_game_board(game_board):
   # for y1 in range(board_height):
    #    for x1 in range(board_width):
    #        print(game_board[x1, y1], end=' ')
        print(game_board)

game_board = initialize_game()
print("Начальное состояние игры:")
print_game_board(game_board)
print()
for episode in range(60):
  q_values = model.predict(np.expand_dims(game_board, axis=0))
  action = np.argmax(q_values)
# Пример действия: удаление пузлей из ячейки (3, 3)

  game_board, reward, done = take_action(game_board, action)
#print(f"Состояние игры после действия в ячейке ({x}, {y}):")
  print_game_board(game_board)
  print(reward)
  print(done)

Начальное состояние игры:
[[1 3 1 3 2 4 1 3]
 [3 2 2 3 2 3 1 2]
 [1 3 4 2 4 1 2 1]
 [2 3 1 4 4 2 4 3]
 [3 4 3 4 2 3 2 4]
 [2 3 1 1 2 3 3 4]
 [1 2 3 2 1 2 3 1]
 [2 1 4 1 4 1 2 3]]

[[1 3 1 3 2 4 1 3]
 [3 2 2 3 2 3 1 2]
 [1 3 4 2 1 2 1 0]
 [2 3 1 2 4 3 0 0]
 [3 4 3 2 3 2 4 0]
 [2 3 1 1 2 3 3 4]
 [1 2 3 2 1 2 3 1]
 [2 1 4 1 4 1 2 3]]
4
False
[[1 3 1 3 2 4 1 3]
 [3 2 2 3 2 3 1 2]
 [1 3 4 1 2 1 0 0]
 [2 3 1 4 3 0 0 0]
 [3 4 3 3 2 4 0 0]
 [2 3 1 1 2 3 3 4]
 [1 2 3 2 1 2 3 1]
 [2 1 4 1 4 1 2 3]]
3
False
[[1 3 1 3 2 4 1 3]
 [3 2 2 3 2 3 1 2]
 [1 3 4 1 2 1 0 0]
 [2 3 1 4 3 0 0 0]
 [3 4 2 4 0 0 0 0]
 [2 3 1 1 2 3 3 4]
 [1 2 3 2 1 2 3 1]
 [2 1 4 1 4 1 2 3]]
2
False
[[1 3 1 3 2 4 1 3]
 [3 2 2 3 2 3 1 2]
 [1 3 4 1 2 1 0 0]
 [2 3 1 3 0 0 0 0]
 [3 4 2 0 0 0 0 0]
 [2 3 1 1 2 3 3 4]
 [1 2 3 2 1 2 3 1]
 [2 1 4 1 4 1 2 3]]
2
False
[[1 3 1 3 2 4 1 3]
 [3 2 2 3 2 3 1 2]
 [1 3 4 1 2 1 0 0]
 [2 3 1 3 0 0 0 0]
 [3 4 2 0 0 0 0 0]
 [2 3 1 1 2 3 3 4]
 [1 2 3 2 1 2 3 1]
 [2 1 4 1 4 1 2 3]]
-1
False
[[1 3 1 3 2 4 

In [57]:
import tensorflow as tf
import numpy as np
import random

# Определение архитектуры нейронной сети DQN
model = tf.ker  as.Sequential([
    tf.keras.layers.Input(shape=state_shape),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(action_space)  # Выходной слой для предсказания Q-значений для каждого действия
])

# Определение оптимизатора
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

# Определение функции потерь
def compute_loss(target_q, predicted_q):
    return tf.reduce_mean(tf.square(target_q - predicted_q))

# Определение опыта и буфера воспроизведения
class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.buffer = []

    def add(self, experience):
        if len(self.buffer) >= self.buffer_size:
            self.buffer.pop(0)
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

# Определение обучающего цикла
def train_step(states, actions, rewards, next_states, done):
    with tf.GradientTape() as tape:
        q_values = model(states, training=True)
        selected_action_q_values = tf.reduce_sum(tf.one_hot(actions, action_space) * q_values, axis=1)
        next_q_values = model(next_states, training=True)
        max_next_q_values = tf.reduce_max(next_q_values, axis=1)
        target_q = rewards + (1 - done) * discount_factor * max_next_q_values
        loss = compute_loss(target_q, selected_action_q_values)

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

# Инициализация буфера воспроизведения
replay_buffer = ReplayBuffer(buffer_size)

# Определение параметров обучения
discount_factor = 0.99
exploration_max = 1.0
exploration_min = 0.01
exploration_decay = 0.995
batch_size = 64

# Обучение DQN
for episode in range(num_episodes):
    state = initial_state
    episode_reward = 0

    for step in range(max_steps_per_episode):
        if random.uniform(0, 1) < exploration_max:
            action = random.choice(range(action_space))
        else:
            q_values = model.predict(state)
            action = np.argmax(q_values)

        next_state, reward, done = take_action(action)
        episode_reward += reward

        replay_buffer.add((state, action, reward, next_state, done))

        if len(replay_buffer.buffer) >= batch_size:
            experiences = replay_buffer.sample(batch_size)
            states, actions, rewards, next_states, dones = zip(*experiences)
            states = np.array(states)
            actions = np.array(actions)
            rewards = np.array(rewards)
            next_states = np.array(next_states)
            dones = np.array(dones)

            train_step(states, actions, rewards, next_states, dones)

        if done:
            break

        state = next_state

    if exploration_max > exploration_min:
        exploration_max *= exploration_decay

NameError: ignored