In [25]:
import numpy as np

import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Input, BatchNormalization, Activation, add, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import matplotlib.pyplot as plt
import os
import random
import time
import importlib

from tqdm.notebook import tqdm

from settings import ROWS, COLUMNS, PLAYER, AI, EMPTY
from board import is_valid_location, get_next_open_row, get_valid_locations, create_board, drop_piece, winning_move
import MCTS
importlib.reload(MCTS)
from MCTS import mcts_search

In [26]:
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.regularizers import l2

In [27]:
try:
    physical_devices = tf.config.list_physical_devices('GPU')
    if physical_devices:
        tf.config.experimental.set_memory_growth(physical_devices[0], True)
        print("Đang sử dụng GPU")
    tf.config.run_functions_eagerly(True)
except:
    print("Không tìm thấy GPU, sử dụng CPU")
    tf.config.run_functions_eagerly(True)

In [40]:
class Connect4NeuralNetwork:
    def __init__(self, model_path=None):
        self.model = None
        self.model_path = model_path
        self._prediction_cache = {}
        self.model_ready = False
        
        if model_path and os.path.exists(model_path):
            self.load_model(model_path)
        else:
            self.build_model()
    
    def build_model(self):
        input_shape = (ROWS, COLUMNS, 3)
        inputs = Input(shape=input_shape)
        
        x = Conv2D(128, (3, 3), padding='same', kernel_regularizer=l2(0.00015))(inputs)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
   
        x = self._residual_block(x, 64)
        x = self._residual_block(x, 64)
        x = self._residual_block(x, 64)
        x = Dropout(0.15)(x)
        x = self._residual_block(x, 128)
        x = Dropout(0.15)(x)
        x = self._residual_block(x, 128)

        policy_head = Conv2D(64, (1, 1), padding='same', kernel_regularizer=l2(0.00015))(x)
        policy_head = BatchNormalization()(policy_head)
        policy_head = Activation('relu')(policy_head)
        policy_head = Conv2D(32, (1, 1), padding='same', kernel_regularizer=l2(0.00015))(policy_head)
        policy_head = BatchNormalization()(policy_head)
        policy_head = Activation('relu')(policy_head)
        policy_head = Flatten()(policy_head)
        policy_head = Dropout(0.275)(policy_head)
        policy_head = Dense(COLUMNS, activation='softmax', name='policy', kernel_regularizer=l2(0.00015))(policy_head)
        
        value_head = Conv2D(64, (1, 1), padding='same', kernel_regularizer=l2(0.00015))(x)
        value_head = BatchNormalization()(value_head)
        value_head = Activation('relu')(value_head)
        value_head = Conv2D(32, (1, 1), padding='same', kernel_regularizer=l2(0.00015))(value_head)
        value_head = BatchNormalization()(value_head)
        value_head = Activation('relu')(value_head)
        value_head = Flatten()(value_head)
        value_head = Dense(64, activation='relu', kernel_regularizer=l2(0.0002))(value_head)
        value_head = Dropout(0.5)(value_head)
        value_head = Dense(32, activation='relu', kernel_regularizer=l2(0.0002))(value_head)
        value_head = Dropout(0.3)(value_head)
        value_head = Dense(1, activation='tanh', name='value', kernel_regularizer=l2(0.0002))(value_head)
        
        self.model = Model(inputs=inputs, outputs=[policy_head, value_head])
        
        self.model.compile(
            optimizer=Adam(learning_rate=0.00005),
            loss={
                'policy': 'categorical_crossentropy',
                'value': 'mean_squared_error'
            },
            loss_weights={
                'policy': 1.2,
                'value': 2.5
            },
            metrics={
                'policy': 'accuracy',
                'value': 'mean_absolute_error'
            }
        )
        
        self.model.summary()
        self.model_ready = True
    
    # Học phần chênh lệch
    def _residual_block(self, x, filters):
        shortcut = x
        
        # x = Conv2D(filters, (3, 3), padding='same')(x)
        x = Conv2D(filters, (3, 3), padding='same', kernel_regularizer=l2(0.00015))(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        
        # x = Conv2D(filters, (3, 3), padding='same')(x)
        x = Conv2D(filters, (3, 3), padding='same', kernel_regularizer=l2(0.00015))(x)
        x = BatchNormalization()(x)
        
        x = add([x, shortcut])
        x = Activation('relu')(x)
        
        return x
    
    def _board_to_key(self, board):
        key = []
        for r in range(ROWS):
            for c in range(COLUMNS):
                key.append(str(board[r][c]))
        return "".join(key)
    
    def _board_to_tensor(self, board):
        tensor = np.zeros((ROWS, COLUMNS, 3), dtype=np.float32)
        
        for r in range(ROWS):
            for c in range(COLUMNS):
                if board[r][c] == AI:
                    tensor[r, c, 0] = 1
                elif board[r][c] == PLAYER:
                    tensor[r, c, 1] = 1
                else:  # EMPTY
                    tensor[r, c, 2] = 1
        
        return tensor
    
    def predict(self, board):
        if not self.model_ready:
            default_policy = np.ones(COLUMNS) / COLUMNS
            default_value = 0.0
            return default_policy, default_value
        
        board_key = self._board_to_key(board)
        
        if board_key in self._prediction_cache:
            return self._prediction_cache[board_key]
        
        board_tensor = self._board_to_tensor(board)
        policy, value = self.model.predict(np.expand_dims(board_tensor, axis=0), verbose=0)

        result = (policy[0], value[0][0])
        
        self._prediction_cache[board_key] = result
        
        return result
    
    def clear_cache(self):
        self._prediction_cache = {}
    
    def train(self, boards, policies, values, epochs=15, batch_size=64, validation_split=0.2):
        X = np.array([self._board_to_tensor(board) for board in boards])
        y_policy = np.array(policies, dtype=np.float32)
        y_value = np.array(values, dtype=np.float32).reshape(-1, 1)
        
        callbacks = [
            ModelCheckpoint(
                filepath='models/best_connect4_model.h5',
                save_best_only=True,
                monitor='val_loss',
                mode='min'
            ),
            EarlyStopping(
                monitor='val_loss',
                patience=12,
                restore_best_weights=True
            ),
            ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.6,
                patience=3,
                min_lr=0.000008
            )
        ]
        
        history = self.model.fit(
            X, 
            {'policy': y_policy, 'value': y_value},
            epochs=epochs,
            batch_size=batch_size,
            validation_split=validation_split,
            callbacks=callbacks,
            verbose=1
        )
        
        self.clear_cache()
        
        return history
    
    def save_model(self, path='models/connect4_model.h5'):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        self.model.save(path)
        print(f"Model đã được lưu tại {path}")
    
    def load_model(self, path):
        self.model = tf.keras.models.load_model(path)
        print(f"Model đã được tải từ {path}")
        self.model_ready = True

        self.model.compile(
        optimizer=Adam(learning_rate=0.00005), 
        loss={
            'policy': 'categorical_crossentropy',
            'value': 'mean_squared_error'
        },
        loss_weights={
        'policy': 1.3,
        'value': 2.4
        },
        metrics={
            'policy': 'accuracy',
            'value': 'mean_absolute_error'
        }
    )
    
    def generate_training_data(self, mcts_games, augment=True):
        boards = []
        policies = []
        values = []
        
        for board, policy, result in mcts_games:
            boards.append(board)
            policies.append(policy)
            values.append(result)
  
            if augment:
                flipped_board = np.flip(board, axis=1).copy()
                flipped_policy = np.flip(policy).copy()
                boards.append(flipped_board)
                policies.append(flipped_policy)
                values.append(result)
                
                # Thêm nhiễu nhẹ vào policy để tăng tính đa dạng
                if len(policy) > 0:
                    noisy_policy = policy.copy()
                    noise = np.random.normal(0, 0.025, size=policy.shape)
                    for i in range(len(policy)):
                        if policy[i] > 0.03: 
                            noisy_policy[i] += noise[i]
                    noisy_policy = np.maximum(noisy_policy, 0)
                    if np.sum(noisy_policy) > 0:
                        noisy_policy /= np.sum(noisy_policy)
                        boards.append(board)
                        policies.append(noisy_policy)
                        values.append(result)
        
        return boards, policies, values

In [41]:
class Connect4Agent:
    def __init__(self, neural_network=None, model_path=None):

        if neural_network:
            self.nn = neural_network
        else:
            self.nn = Connect4NeuralNetwork(model_path)
        
        self.temperature = 1.0  
        self.batch_size = 32  
    
    def fast_mcts(self, board, num_simulations=3000, temperature_decay=True):
        pieces_count = np.count_nonzero(board != EMPTY)
        total_positions = ROWS * COLUMNS
        game_progress = pieces_count / total_positions  # 0.0 đến 1.0

        if temperature_decay:
            temperature = max(0.2, 1.0 - game_progress * 0.7)
        else:
            temperature = 1.0

        move, mcts_policy = mcts_search(board, AI, self.nn, num_simulations)
        
        if temperature != 1.0:
            mcts_policy = np.power(mcts_policy + 1e-10, 1.0/temperature)
            mcts_policy /= np.sum(mcts_policy)
            
        return move, mcts_policy

In [30]:
def collect_self_play_data(agent, num_games=50, mcts_simulations=2000):
    training_data = []
    win_stats = {'AI': 0, 'PLAYER': 0, 'DRAW': 0}
    
    for game_idx in tqdm(range(num_games), desc="Self-play games"):
        board = create_board()
        game_memory = []
        current_player = AI if game_idx % 2 == 0 else PLAYER  # Luân phiên người đi trước
        
        game_start_time = time.time()
        
        while True:
            valid_moves = get_valid_locations(board)
            
            if not valid_moves:  
                result = 0.0
                win_stats['DRAW'] += 1
                break
                
            if current_player == AI:
                move, mcts_policy = agent.fast_mcts(board, mcts_simulations, temperature_decay=True)
                game_memory.append((board.copy(), mcts_policy, AI))
            else:
                move, mcts_policy = mcts_search(board, AI, neural_network=None)
                game_memory.append((board.copy(), mcts_policy, PLAYER))
            
            row = get_next_open_row(board, move)
            drop_piece(board, row, move, current_player)
            
            if winning_move(board, current_player)[0]:
                if current_player == AI:
                    result = 1.0
                    win_stats['AI'] += 1
                else:
                    result = -1.0
                    win_stats['PLAYER'] += 1
                break
                
            current_player = PLAYER if current_player == AI else AI
        
        for board_state, policy, player in game_memory:
            if player == AI:
                training_data.append((board_state, policy, result))
            else:
                training_data.append((board_state, policy, -result))
        
        game_time = time.time() - game_start_time
        print(f"Game {game_idx+1} hoàn thành trong {game_time:.1f}s: {'AI' if result > 0 else 'PLAYER' if result < 0 else 'HOÀ'}")
        
        if (game_idx + 1) % 5 == 0:
            print(f"Thống kê sau {game_idx + 1} trò chơi:")
            print(f"NN thắng: {win_stats['AI']} ({win_stats['AI']/(game_idx+1)*100:.1f}%)")
            print(f"MCTS thắng: {win_stats['PLAYER']} ({win_stats['PLAYER']/(game_idx+1)*100:.1f}%)")
            print(f"Hoà: {win_stats['DRAW']} ({win_stats['DRAW']/(game_idx+1)*100:.1f}%)")
    
    print(f"Thống kê cuối cùng - NN thắng: {win_stats['AI']}, MCTS thắng: {win_stats['PLAYER']}, Hoà: {win_stats['DRAW']}")
    return training_data

In [33]:
def train_model_cycle(cycles=5, games_per_cycle=50, epochs_per_cycle=15, mcts_simulations=2000):
    model_path = 'models/connect4_model.h5' if os.path.exists('models/connect4_model.h5') else None
    nn = Connect4NeuralNetwork(model_path)
    agent = Connect4Agent(nn)
    
    training_history = []
    
    for cycle in range(cycles):
        cycle_start_time = time.time()
        print(f"=== Bắt đầu chu kỳ {cycle + 1}/{cycles} ===")
        
        # Thu thập dữ liệu từ tự chơi
        print("Thu thập dữ liệu tự chơi...")
        training_data = collect_self_play_data(agent, num_games=games_per_cycle, mcts_simulations=mcts_simulations)
        
        # Chuẩn bị dữ liệu huấn luyện
        boards, policies, values = nn.generate_training_data(training_data)
        
        print(f"Dữ liệu thu thập: {len(boards)} vị trí")
        
        print("Huấn luyện neural network...")
        history = nn.train(boards, policies, values, epochs=epochs_per_cycle, batch_size=32)
        training_history.append(history)
        
        nn.save_model(f'models/connect4_model_cycle_{cycle + 1}.h5')
        nn.save_model('models/connect4_model.h5') 
        
        plt.figure(figsize=(12, 4))
        plt.subplot(1, 2, 1)
        plt.plot(history.history['policy_accuracy'])
        plt.plot(history.history['val_policy_accuracy'])
        plt.title('Policy Accuracy')
        plt.ylabel('Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        
        plt.subplot(1, 2, 2)
        plt.plot(history.history['value_mean_absolute_error'])
        plt.plot(history.history['val_value_mean_absolute_error'])
        plt.title('Value MAE')
        plt.ylabel('MAE')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        
        plt.tight_layout()
        plt.savefig(f'models/training_cycle_{cycle + 1}.png')
        plt.show()
        
        cycle_time = time.time() - cycle_start_time
        print(f"Chu kỳ {cycle + 1} hoàn thành trong {cycle_time/60:.2f} phút")
    
    return nn, training_history

In [8]:
import pygame as pg
import sys
from time import sleep
import random as rd

from settings import *

from importlib import reload
import utils
reload(utils)
from utils import create_gradient_background, draw_board, animate_piece_drop, show_game_over

In [19]:
def evaluate_model(agent, num_games=20, opponent='mcts'):
    win_stats = {'Agent': 0, 'Opponent': 0, 'Draw': 0}
    
    for game_idx in tqdm(range(num_games), desc="Evaluation games"):
        board = create_board()

        agent_first = game_idx % 2 == 0
        current_player = AI if agent_first else PLAYER
        
        while True:
            valid_moves = get_valid_locations(board)
            
            if not valid_moves: 
                win_stats['Draw'] += 1
                break
                
            if (current_player == AI and agent_first) or (current_player == PLAYER and not agent_first):
                if opponent == 'mcts':
                    # MCTS thuần túy với số mô phỏng giảm
                    move, _ = mcts_search(board, AI, neural_network=None)
                else:
                    move = random.choice(valid_moves)
            else:
                move, _ = agent.fast_mcts(board, 300, temperature_decay=False)
            
            row = get_next_open_row(board, move)
            drop_piece(board, row, move, current_player)
            
            if winning_move(board, current_player)[0]:
                if (current_player == AI and agent_first) or (current_player == PLAYER and not agent_first):
                    win_stats['Opponent'] += 1
                else:
                    win_stats['Agent'] += 1
                break
                
            current_player = PLAYER if current_player == AI else AI
    
    print(f"Kết quả đánh giá với {opponent}:")
    print(f"Agent thắng: {win_stats['Agent']} ({win_stats['Agent']/num_games*100:.1f}%)")
    print(f"Opponent thắng: {win_stats['Opponent']} ({win_stats['Opponent']/num_games*100:.1f}%)")
    print(f"Hoà: {win_stats['Draw']} ({win_stats['Draw']/num_games*100:.1f}%)")
    
    return win_stats


In [45]:
nn, history = train_model_cycle(cycles=2, games_per_cycle=50, epochs_per_cycle=20, mcts_simulations=2000)

agent = Connect4Agent(nn)



Model đã được tải từ models/connect4_model.h5
=== Bắt đầu chu kỳ 1/2 ===
Thu thập dữ liệu tự chơi...


Self-play games:   0%|          | 0/50 [00:00<?, ?it/s]

MCTS: 30000 mô phỏng trong 7.385s
  Cột 3: 27457 lần thăm, tỷ lệ thắng: 0.790
  Cột 2: 456 lần thăm, tỷ lệ thắng: 0.636
  Cột 4: 451 lần thăm, tỷ lệ thắng: 0.634
  Cột 6: 448 lần thăm, tỷ lệ thắng: 0.701
  Cột 5: 422 lần thăm, tỷ lệ thắng: 0.661
  Cột 0: 400 lần thăm, tỷ lệ thắng: 0.688
  Cột 1: 366 lần thăm, tỷ lệ thắng: 0.645
MCTS: 2000 mô phỏng trong 41.858s
  Cột 3: 789 lần thăm, tỷ lệ thắng: 0.418
  Cột 2: 245 lần thăm, tỷ lệ thắng: 0.330
  Cột 6: 226 lần thăm, tỷ lệ thắng: 0.386
  Cột 4: 195 lần thăm, tỷ lệ thắng: 0.298
  Cột 5: 193 lần thăm, tỷ lệ thắng: 0.331
  Cột 1: 188 lần thăm, tỷ lệ thắng: 0.328
  Cột 0: 164 lần thăm, tỷ lệ thắng: 0.342
MCTS: 30000 mô phỏng trong 7.587s
  Cột 3: 24182 lần thăm, tỷ lệ thắng: 0.643
  Cột 5: 1201 lần thăm, tỷ lệ thắng: 0.525
  Cột 2: 1109 lần thăm, tỷ lệ thắng: 0.486
  Cột 1: 924 lần thăm, tỷ lệ thắng: 0.508
  Cột 0: 923 lần thăm, tỷ lệ thắng: 0.541
  Cột 6: 881 lần thăm, tỷ lệ thắng: 0.537
  Cột 4: 780 lần thăm, tỷ lệ thắng: 0.462
MCTS: 2000

In [None]:
nn = Connect4NeuralNetwork('models/connect4_model.h5')
agent = Connect4Agent(nn)
results_vs_mcts = evaluate_model(agent, num_games=1, opponent='mcts')

In [None]:
from ai_battle import main

nn = Connect4NeuralNetwork('models/connect4_model.h5')
agent = Connect4Agent(nn)
main(agent)

In [17]:
# import pickle

# with open('models/evaluation_results.pkl', 'wb') as f:
#     pickle.dump({'vs_mcts': results_vs_mcts}, f)

# with open('models/training_history.pkl', 'wb') as f:
#     pickle.dump(history, f)

# print("Quá trình huấn luyện hoàn thành và đã lưu kết quả!")

Quá trình huấn luyện hoàn thành và đã lưu kết quả!


In [44]:
import shutil
shutil.copy('models/connect4_model_cycle_1.h5', 'models/connect4_model_backup.h5')

'models/connect4_model_backup.h5'

In [113]:
# import shutil
# import os

# if os.path.exists('models/connect4_model_debug.h5'):
#     shutil.copy('models/connect4_model.h5', 'models/connect4_model_backup.h5')