<h2><center>Bot File</center></h2>
<h2>Core Function</h2>
<p>Implements a Deep Q-Learning (DQN) AI agent for a 2D fighting game.</p>

<h2>Key Features</h2>

<h3>DQN Agent</h3>
<ul>
  <li>Manages a neural network that learns optimal moves</li>
  <li>Handles experience replay and training</li>
  <li>Supports model saving/loading</li>
</ul>

<h3>Game Interaction</h3>
<ul>
  <li>Tracks 17 game state features (positions, health, moves)</li>
  <li>Converts 12 possible actions → button combinations (movement + attacks)</li>
  <li>Implements special moves (fireballs, dragon punches)</li>
</ul>

<h3>Adaptive Strategies</h3>
<ul>
  <li>Switches between defensive/aggressive modes based on health</li>
  <li>Maintains move cooldowns</li>
  <li>Logs actions for debugging</li>
</ul>

<h2>Workflow</h2>
<ol>
  <li>Observes game state</li>
  <li>Selects action using DQN</li>
  <li>Converts to controller inputs</li>
  <li>Stores experience and trains periodically</li>
</ol>

</ul>



from dqn import DQNAgent
import torch
import numpy as np
from logger import logger
from command import Command
from buttons import Buttons

class Bot:
    def __init__(self, player_number=1):
        # Set player number
        self.player_number = player_number
        
        # Define action space (12 possible button combinations)
        self.action_size = 12
        # Define state size (17 features: player x, y, health, jumping, crouching, in_move, move_id,
        # opponent x, y, health, jumping, crouching, in_move, move_id, timer, round_started, round_over)
        self.state_size = 17
        
        # Initialize DQN agent
        self.agent = DQNAgent(self.state_size, self.action_size, self.player_number)
        
        # Load model if it exists
        model_path = f'models/dqn_model_p{self.player_number}.pth'
        try:
            self.agent.load_model(model_path)
            logger.info(f"Loaded existing DQN model for player {self.player_number}")
        except:
            # Try loading the default model if player-specific model doesn't exist
            try:
                self.agent.load_model('models/dqn_model.pth')
                logger.info(f"Loaded default DQN model for player {self.player_number}")
            except:
                logger.info(f"No existing model found for player {self.player_number}, starting fresh")
            
        self.last_state = None
        self.last_game_state = None
        self.last_action = None
        
        self.fire_code = ["<", "!<", "v+<", "!v+!<", "v", "!v", "v+>", "!v+!>", ">+Y", "!>+!Y"]
        self.exe_code = 0
        self.start_fire = True
        self.remaining_code = []
        self.my_command = Command()
        self.buttn = Buttons()
        self.action_history = []
        self.combo_counter = 0
        self.defensive_mode = False
        self.aggressive_mode = False
        self.special_move_cooldown = 0
        
    def action_to_buttons(self, action):
        """Convert action index to button combination"""
        # Define button combinations
        button_combinations = [
            {'up': True},  # Jump
            {'down': True},  # Crouch
            {'left': True},  # Move left
            {'right': True},  # Move right
            {'Y': True},  # Heavy punch
            {'B': True},  # Medium punch
            {'A': True},  # Light punch
            {'X': True},  # Heavy kick
            {'L': True},  # Medium kick
            {'R': True},  # Light kick
            {'up': True, 'Y': True},  # Jump heavy punch
            {'down': True, 'B': True}  # Crouch medium punch
        ]
        
        return button_combinations[action]
        
    def update_state(self, current_game_state, player):
        """Update internal state based on game state"""
        if player == "1":
            self_player = current_game_state.player1
            opponent = current_game_state.player2
        else:
            self_player = current_game_state.player2
            opponent = current_game_state.player1
            
        # Update defensive/aggressive modes based on health
        health_ratio = self_player.health / opponent.health
        if health_ratio < 0.5:
            self.defensive_mode = True
            self.aggressive_mode = False
        elif health_ratio > 1.5:
            self.defensive_mode = False
            self.aggressive_mode = True
        else:
            self.defensive_mode = False
            self.aggressive_mode = False
            
        # Update special move cooldown
        if self.special_move_cooldown > 0:
            self.special_move_cooldown -= 1
            
        return self_player, opponent

    def execute_special_move(self, player, move_type="fireball"):
        """Execute special moves based on type"""
        if self.special_move_cooldown > 0:
            return False
            
        if move_type == "fireball":
            self.run_command(["<", "!<", "v+<", "!v+!<", "v", "!v", "v+>", "!v+!>", ">+Y", "!>+!Y"], player)
        elif move_type == "dragon_punch":
            self.run_command([">", "!>", "v+>", "!v+!>", "^+Y", "!^+!Y"], player)
        elif move_type == "spinning_kick":
            self.run_command(["v", "!v", "^+B", "!^+!B"], player)
            
        self.special_move_cooldown = 30
        return True

    def choose_action(self, self_player, opponent):
        """Choose the best action based on current state"""
        distance = abs(opponent.x_coord - self_player.x_coord)
        
        # Defensive actions
        if self.defensive_mode:
            if distance < 50:
                return "block"
            elif distance < 100:
                return "move_away"
            else:
                return "fireball"
                
        # Aggressive actions
        if self.aggressive_mode:
            if distance > 100:
                return "move_close"
            elif distance > 50:
                return "dragon_punch"
            else:
                return "combo"
                
        # Neutral actions
        if distance > 100:
            return "fireball"
        elif distance > 50:
            return "move_close"
        else:
            return "combo"

    def fight(self, game_state, player_number):
        """Main fighting logic using DQN"""
        # Update player number if needed
        self.player_number = int(player_number)
        
        # Get current state
        current_state = self.agent.get_state(game_state)
        
        # Select action
        action = self.agent.select_action(current_state)
        
        # Convert action to button combination
        button_dict = self.action_to_buttons(action)
        
        # Create Buttons object
        buttons = Buttons()
        for button, value in button_dict.items():
            # IMPORTANT: Preserve case for Y, B, A, X, L, R buttons
            if button in ['Y', 'B', 'A', 'X', 'L', 'R']:
                setattr(buttons, button, value)
            else:
                setattr(buttons, button.lower(), value)
                
        # Debug: Print button states
        pressed_buttons = []
        for btn in ['up', 'down', 'left', 'right', 'Y', 'B', 'A', 'X', 'L', 'R']:
            if getattr(buttons, btn, False):
                pressed_buttons.append(btn)
        logger.info(f"Player {self.player_number} action {action}: {pressed_buttons}")
        
        # If we have a previous state and action, store the experience
        if self.last_state is not None and self.last_action is not None:
            # Calculate reward using the game state objects
            reward = self.agent.get_reward(self.last_game_state, game_state)
            
            # Store experience in replay buffer
            self.agent.memory.push(
                self.last_state,
                self.last_action,
                reward,
                current_state,
                game_state.is_round_over
            )
            
            # Train the network
            loss = self.agent.train()
            if loss is not None:
                logger.info(f"Training loss for player {self.player_number}: {loss:.4f}")
                
            # Update target network every 1000 steps
            if len(self.agent.memory) % 1000 == 0:
                self.agent.update_target_network()
                logger.info(f"Updated target network for player {self.player_number}")
                
        # Save current state and action for next step
        self.last_state = current_state  # Store the tensor state
        self.last_game_state = game_state  # Store the game state object
        self.last_action = action
        
        # Save model periodically
        if len(self.agent.memory) % 10000 == 0:
            model_path = f'models/dqn_model_p{self.player_number}.pth'
            self.agent.save_model(model_path)
            logger.info(f"Saved DQN model for player {self.player_number}")
            
        return buttons

    def run_command(self, com, player):
        if not com:
            return
            
        if self.exe_code >= len(com):
            self.exe_code = 0
            return
            
        current_command = com[self.exe_code]
        self.buttn.init_buttons()
        
        # Parse the command string
        if "+" in current_command:
            buttons = current_command.split("+")
            for button in buttons:
                if button.startswith("!"):
                    setattr(self.buttn, button[1:].lower(), False)
                else:
                    setattr(self.buttn, button.lower(), True)
        else:
            if current_command.startswith("!"):
                setattr(self.buttn, current_command[1:].lower(), False)
            else:
                setattr(self.buttn, current_command.lower(), True)
                
        self.exe_code += 1


<h2><center>Buttons File</center></h2>
<div class="buttons-description">
  <p><strong>Buttons</strong> class manages game controller inputs for the fighting game AI:</p>
  
  <ul>
    <li>Represents all controller buttons (directions, actions, menu buttons)</li>
    <li>Three core methods:
      <ul>
        <li><code>init_buttons()</code>: Resets all buttons to False (unpressed)</li>
        <li><code>dict_to_object()</code>: Loads button states from dictionary</li>
        <li><code>object_to_dict()</code>: Exports current states as dictionary</li>
      </ul>
    </li>
    <li>Handles both directional inputs (up/down/left/right) and action buttons (Y/B/X/A/L/R)</li>
    <li>Includes menu buttons (Select/Start) for game navigation</li>
  </ul>
  
  <p>Serves as interface between AI decisions and game controller inputs.</p>
</div>


In [None]:

class Buttons:

    def __init__(self, buttons_dict=None):

        if buttons_dict is not None:
            self.dict_to_object(buttons_dict)
        else:
            self.init_buttons()

    def init_buttons(self):
        self.up = False
        self.down = False
        self.right = False
        self.left = False
        self.select = False
        self.start = False
        self.Y = False
        self.B = False
        self.X = False
        self.A = False
        self.L = False
        self.R = False

    def dict_to_object(self, buttons_dict):

        self.up = buttons_dict['Up']
        self.down = buttons_dict['Down']
        self.right = buttons_dict['Right']
        self.left = buttons_dict['Left']
        self.select = buttons_dict['Select']
        self.start = buttons_dict['Start']
        self.Y = buttons_dict['Y']
        self.B = buttons_dict['B']
        self.X = buttons_dict['X']
        self.A = buttons_dict['A']
        self.L = buttons_dict['L']
        self.R = buttons_dict['R']

    def object_to_dict(self):

        buttons_dict = {}

        buttons_dict['Up'] = self.up
        buttons_dict['Down'] = self.down
        buttons_dict['Right'] = self.right
        buttons_dict['Left'] = self.left
        buttons_dict['Select'] = self.select
        buttons_dict['Start'] = self.start
        buttons_dict['Y'] = self.Y
        buttons_dict['B'] = self.B
        buttons_dict['X'] = self.X
        buttons_dict['A'] = self.A
        buttons_dict['L'] = self.L
        buttons_dict['R'] = self.R

        return buttons_dict

<h2><center>Command File</center></h2>
<div class="command-description">
  <p><strong>Command</strong> class manages player inputs and game commands:</p>
  
  <ul>
    <li>Maintains button states for both players using <code>Buttons</code> class</li>
    <li>Tracks command type (default: "buttons") and player count (default: 2)</li>
    <li>Includes save game path for game state persistence</li>
    <li>Key method:
      <ul>
        <li><code>object_to_dict()</code>: Converts player inputs to dictionary format</li>
      </ul>
    </li>
  </ul>
  
  <p>Acts as a container for player commands and game meta-information.</p>
</div>


In [None]:
from buttons import Buttons

class Command:

    def __init__(self):

        self.player_buttons = Buttons()
        self.player2_buttons = Buttons()
        self.type = "buttons"
        self.__player_count = 2
        self.save_game_path = ""

    def object_to_dict(self):
        
        command_dict = {}

        command_dict['p1'] = self.player_buttons.object_to_dict()
        command_dict['p2'] = self.player2_buttons.object_to_dict()
        command_dict['type'] = self.type
        command_dict['player_count'] = self.__player_count
        command_dict['savegamepath'] = self.save_game_path

        return command_dict

<h2><center>Config File</center></h2>
<div class="config-description">
  <p><strong>Game Configuration</strong> - Centralized settings for the Street Fighter AI:</p>
  
  <ul>
    <li><strong>Memory Addresses</strong>:
      <ul>
        <li>Player positions, health, and state (P1/P2)</li>
        <li>Game timer and round status</li>
      </ul>
    </li>
    <li><strong>Network Setup</strong>:
      <ul>
        <li>Localhost ports for player communication</li>
        <li>Buffer size for network packets</li>
      </ul>
    </li>
    <li><strong>Control Schemes</strong>:
      <ul>
        <li>Button mappings for all inputs</li>
        <li>Special move command sequences (fireball, dragon punch, etc.)</li>
      </ul>
    </li>
    <li><strong>AI Parameters</strong>:
      <ul>
        <li>Health-based behavior thresholds</li>
        <li>Move cooldowns and combo settings</li>
      </ul>
    </li>
    <li><strong>Logging</strong>:
      <ul>
        <li>Log file path and formatting</li>
        <li>Detail level configuration</li>
      </ul>
    </li>
  </ul>
  
  <p>Contains all tunable parameters for the fighting AI system.</p>
</div>


In [None]:
# Memory addresses for Street Fighter II Turbo (U) [smc]
MEMORY_ADDRESSES = {
    'PLAYER1': {
        'X_POS': 0x7E0F80,  # Player 1 X coordinate
        'Y_POS': 0x7E0F82,  # Player 1 Y coordinate
        'HEALTH': 0x7E0F84,  # Player 1 health
        'STATE': 0x7E0F86,   # Player 1 state (standing, jumping, etc.)
    },
    'PLAYER2': {
        'X_POS': 0x7E0F88,  # Player 2 X coordinate
        'Y_POS': 0x7E0F8A,  # Player 2 Y coordinate
        'HEALTH': 0x7E0F8C,  # Player 2 health
        'STATE': 0x7E0F8E,   # Player 2 state
    },
    'GAME': {
        'TIMER': 0x7E0F90,    # Round timer
        'ROUND': 0x7E0F92,    # Current round
        'ROUND_STATE': 0x7E0F94,  # Round state (0=not started, 1=in progress, 2=over)
    }
}

# Network configuration
NETWORK_CONFIG = {
    'HOST': '127.0.0.1',
    'PORT_P1': 9998,
    'PORT_P2': 10001,
    'BUFFER_SIZE': 4096
}

# Button mappings
BUTTON_MAPPINGS = {
    'UP': '^',
    'DOWN': 'v',
    'LEFT': '<',
    'RIGHT': '>',
    'Y': 'Y',
    'B': 'B',
    'A': 'A',
    'X': 'X',
    'L': 'L',
    'R': 'R',
    'SELECT': 'select',
    'START': 'start'
}

# Special move inputs
SPECIAL_MOVES = {
    'FIREBALL': ['<', '!<', 'v+<', '!v+!<', 'v', '!v', 'v+>', '!v+!>', '>+Y', '!>+!Y'],
    'DRAGON_PUNCH': ['>', '!>', 'v+>', '!v+!>', '^+Y', '!^+!Y'],
    'SPINNING_KICK': ['v', '!v', '^+B', '!^+!B']
}

# Bot configuration
BOT_CONFIG = {
    'DEFENSIVE_HEALTH_RATIO': 0.5,
    'AGGRESSIVE_HEALTH_RATIO': 1.5,
    'SPECIAL_MOVE_COOLDOWN': 30,
    'COMBO_LENGTH': 3,
    'REACTION_TIME': 0.1  # seconds
}

# Logging configuration
LOGGING_CONFIG = {
    'LOG_FILE': 'bot.log',
    'LOG_LEVEL': 'INFO',
    'LOG_FORMAT': '%(asctime)s - %(levelname)s - %(message)s'
} 

<h2><center>Controller File</center></h2>
<div class="controller-description">
  <h3>Game Controller System</h3>
  
  <p><strong>Core Functionality:</strong></p>
  <ul>
    <li>Manages communication between AI agents and Street Fighter game emulator</li>
    <li>Handles both single-player and two-player modes</li>
    <li>Coordinates real-time gameplay data exchange</li>
  </ul>

  <p><strong>Key Components:</strong></p>
  <ul>
    <li><strong>Network Communication</strong>:
      <ul>
        <li>Socket-based connection to game emulator</li>
        <li>JSON serialization for game state and commands</li>
      </ul>
    </li>
    <li><strong>Input Processing</strong>:
      <ul>
        <li>Detailed button state logging (directions + 6 attack buttons)</li>
        <li>Human-readable button press visualization</li>
      </ul>
    </li>
    <li><strong>Game Loop</strong>:
      <ul>
        <li>Frame-by-frame state processing</li>
        <li>AI decision integration</li>
        <li>Round management</li>
      </ul>
    </li>
  </ul>
</div>


In [None]:
import socket
import json
from game_state import GameState
from bot import Bot
from data_recorder import DataRecorder
from logger import logger
from command import Command
from buttons import Buttons
import sys
import os
import threading
import time
from datetime import datetime

def log_action_buttons(player_num, buttons):
    """Log detailed information about action button presses"""
    action_buttons_pressed = []
    
    # Check individual button states
    logging_dict = {
        'Y': getattr(buttons, 'Y', False),
        'B': getattr(buttons, 'B', False),
        'A': getattr(buttons, 'A', False),
        'X': getattr(buttons, 'X', False),
        'L': getattr(buttons, 'L', False),
        'R': getattr(buttons, 'R', False)
    }
    
    # Log all button states for debugging
    logger.info(f"P{player_num} button states: Y={logging_dict['Y']}, B={logging_dict['B']}, A={logging_dict['A']}, X={logging_dict['X']}, L={logging_dict['L']}, R={logging_dict['R']}")
    
    # Check which action buttons are pressed
    if buttons.Y: action_buttons_pressed.append("Y (Heavy Punch)")
    if buttons.B: action_buttons_pressed.append("B (Medium Punch)")
    if buttons.A: action_buttons_pressed.append("A (Light Punch)")
    if buttons.X: action_buttons_pressed.append("X (Heavy Kick)")
    if buttons.L: action_buttons_pressed.append("L (Medium Kick)")
    if buttons.R: action_buttons_pressed.append("R (Light Kick)")
    
    # If any action buttons are pressed, log them
    if action_buttons_pressed:
        player_type = "Human" if player_num == 1 else "AI"
        logger.info(f"Player {player_num} ({player_type}) ACTION buttons: {', '.join(action_buttons_pressed)}")

def button_state_to_string(buttons):
    """Convert Buttons object to readable string of pressed buttons"""
    # Debug raw button access
    y_value = getattr(buttons, 'Y', 'NotFound')
    b_value = getattr(buttons, 'B', 'NotFound')
    a_value = getattr(buttons, 'A', 'NotFound')
    x_value = getattr(buttons, 'X', 'NotFound')
    logger.debug(f"Raw button values - Y:{y_value}, B:{b_value}, A:{a_value}, X:{x_value}")
    
    pressed = []
    direction_buttons = []
    action_buttons = []
    
    # Direction buttons
    if buttons.up: direction_buttons.append("Up")
    if buttons.down: direction_buttons.append("Down")
    if buttons.left: direction_buttons.append("Left")
    if buttons.right: direction_buttons.append("Right")
    
    # Action buttons - highlight these
    if getattr(buttons, 'Y', False): action_buttons.append("Y(Heavy Punch)")
    if getattr(buttons, 'B', False): action_buttons.append("B(Medium Punch)")
    if getattr(buttons, 'A', False): action_buttons.append("A(Light Punch)")
    if getattr(buttons, 'X', False): action_buttons.append("X(Heavy Kick)")
    if getattr(buttons, 'L', False): action_buttons.append("L(Medium Kick)")
    if getattr(buttons, 'R', False): action_buttons.append("R(Light Kick)")
    
    # Combine all pressed buttons
    if direction_buttons:
        pressed.append("Direction: " + ", ".join(direction_buttons))
    if action_buttons:
        pressed.append("Action: " + ", ".join(action_buttons))
    
    if not pressed:
        return "None"
    return " | ".join(pressed)

def connect(port):
    #For making a connection with the game
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(("127.0.0.1", port))
    server_socket.listen(5)
    (client_socket, _) = server_socket.accept()
    print (f"Connected to game on port {port}!")
    return client_socket

def send(client_socket, command):
    #This function will send your updated command to Bizhawk so that game reacts according to your command.
    command_dict = command.object_to_dict()
    pay_load = json.dumps(command_dict).encode()
    client_socket.sendall(pay_load)

def receive(client_socket):
    #receive the game state and return game state
    pay_load = client_socket.recv(4096)
    input_dict = json.loads(pay_load.decode())
    game_state = GameState(input_dict)
    return game_state

class Player:
    def __init__(self, player_number):
        self.player_number = player_number
        self.port = 9999 if player_number == 1 else 9999
        self.client_socket = None
        self.bot = Bot(player_number)
        self.current_game_state = None
        self.buttons = Buttons()
        self.command = Command()
        self.connected = False
        
    def connect(self):
        try:
            self.client_socket = connect(self.port)
            self.connected = True
            return True
        except Exception as e:
            logger.error(f"Player {self.player_number} connection error: {e}")
            return False
            
    def process_frame(self):
        if not self.connected:
            return None, None
            
        try:
            self.current_game_state = receive(self.client_socket)
            self.buttons = self.bot.fight(self.current_game_state, str(self.player_number))
            
            # Create command object from buttons
            self.command = Command()
            if self.player_number == 1:
                self.command.player_buttons = self.buttons
            else:
                self.command.player2_buttons = self.buttons
                
            send(self.client_socket, self.command)
            
            return self.current_game_state, self.buttons
        except Exception as e:
            logger.error(f"Player {self.player_number} frame processing error: {e}")
            self.connected = False
            return None, None
            
    def disconnect(self):
        if self.client_socket:
            self.client_socket.close()
            self.connected = False

def main():
    # Check if we're running in single player or two player mode
    single_player_mode = len(sys.argv) > 1 and sys.argv[1] in ['1', '2']
    
    # Create player instances
    player1 = Player(1)
    player2 = Player(2)
    recorder = DataRecorder()
    
    # For testing: create a test buttons object with all action buttons pressed
    test_buttons = Buttons()
    test_buttons.Y = True
    test_buttons.B = True
    test_buttons.A = True
    test_buttons.X = True
    test_buttons.L = True
    test_buttons.R = True
    
    # Print a debug message with test button states
    logger.info("TEST BUTTONS - Created test buttons object with action buttons")
    log_action_buttons(999, test_buttons)  # Test player number 999
    
    # Connect players
    if single_player_mode:
        player_num = int(sys.argv[1])
        if player_num == 1:
            if not player1.connect():
                logger.error("Failed to connect player 1")
                return
                
            # Make sure player 2's bot is properly initialized as an opponent
            player2.bot = Bot(2)  # Reinitialize with player number 2
            player2.connected = False  # Not physically connected
        else:
            if not player2.connect():
                logger.error("Failed to connect player 2")
                return
                
            # Make sure player 1's bot is properly initialized as an opponent
            player1.bot = Bot(1)  # Reinitialize with player number 1
            player1.connected = False  # Not physically connected
    else:
        # Connect both players for two-player mode
        player1_connected = player1.connect()
        player2_connected = player2.connect()
        
        if not player1_connected or not player2_connected:
            logger.error("Failed to connect one or both players")
            return
    
    try:
        # Main game loop
        while True:
            # Process player 1 if connected (human player 1)
            if player1.connected:
                game_state1, p1_buttons = player1.process_frame()
                
                # If we have a valid game state, generate AI moves as player 2
                if game_state1 is not None:
                    # Generate AI moves for player 2
                    p2_buttons = player2.bot.fight(game_state1, "2")
                    logger.info(f"AI (P2) pressed: {button_state_to_string(p2_buttons)}")
                else:
                    p2_buttons = Buttons()
            # Process player 2 if connected (human player 2)
            elif player2.connected:
                game_state2, human_p2_buttons = player2.process_frame()
                
                # If we have a valid game state, generate AI moves as player 1
                # but record them as player 2 for consistency
                if game_state2 is not None:
                    # Generate AI moves (technically as player 1)
                    ai_buttons = player1.bot.fight(game_state2, "1")
                    logger.info(f"AI (recorded as P2) pressed: {button_state_to_string(ai_buttons)}")
                    
                    # Record human as player 1 and AI as player 2
                    p1_buttons = human_p2_buttons  # Human actions recorded as P1
                    p2_buttons = ai_buttons        # AI actions recorded as P2
                else:
                    p1_buttons = Buttons()
                    p2_buttons = Buttons()
            else:
                # No players connected
                game_state1 = None
                game_state2 = None
                p1_buttons = Buttons()
                p2_buttons = Buttons()
                
            # Use whichever game state is available
            game_state = game_state1 if game_state1 is not None else game_state2
                
            # Record the frame if we have a valid game state
            if game_state is not None:                
                # Record both players' actions
                recorder.record_frame(
                    game_state,
                    p1_buttons,
                    p2_buttons
                )
                
                # Debug output for both players' actions
                print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] P1 buttons: {button_state_to_string(p1_buttons)}")
                print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] P2 buttons (AI): {button_state_to_string(p2_buttons)}")
                
                # Log specific action button presses with more detail
                log_action_buttons(1, p1_buttons)
                log_action_buttons(2, p2_buttons)
            
            # Check if round is over
            if (game_state is not None and game_state.is_round_over) or \
               (not player1.connected and not player2.connected):
                break
                
            # Small delay to prevent CPU hogging
            time.sleep(0.01)
            
        logger.info("Round finished")
        
    except KeyboardInterrupt:
        logger.info("Recording interrupted by user")
    except Exception as e:
        logger.error(f"Error during recording: {str(e)}")
    finally:
        player1.disconnect()
        player2.disconnect()
        recorder.close()

if __name__ == '__main__':
   main()


<h2><center>Data Recorder File</center></h2>
<div class="data-recorder">
  <h3>Game Data Recorder</h3>
  
  <p><strong>Core Functionality:</strong></p>
  <ul>
    <li>Comprehensive game state logging system for Street Fighter matches</li>
    <li>Records frame-by-frame gameplay data to CSV for analysis</li>
    <li>Tracks both player states and button inputs in real-time</li>
  </ul>

  <p><strong>Key Features:</strong></p>
  <ul>
    <li><strong>Detailed Data Capture</strong>:
      <ul>
        <li>Player positions, health, and movement states</li>
        <li>All button inputs (6 attack buttons + directions)</li>
        <li>Round timer and match progression</li>
      </ul>
    </li>
    <li><strong>CSV Storage</strong>:
      <ul>
        <li>Automatic file creation with headers</li>
        <li>Append-mode for continuous recording</li>
        <li>In-memory buffer with disk flushing</li>
      </ul>
    </li>
    <li><strong>Debugging Tools</strong>:
      <ul>
        <li>Periodic console output (every 60/180 frames)</li>
        <li>Button press visualization</li>
        <li>Detailed game state snapshots</li>
      </ul>
    </li>
  </ul>
  </ul>

  <p><strong>Data Structure:</strong> 45 columns covering player states, game progress, and all controller inputs.</p>
</div>


In [None]:
import csv
import os
from datetime import datetime
from logger import logger

class DataRecorder:
    def __init__(self):
        self.records = []
        self.start_time = datetime.now()
        self.filename = "game_data.csv"  # Use a fixed filename
        self.csv_file = None
        self.csv_writer = None
        self.frame_count = 0
        self.current_round = 1
        
        # Create data directory if it doesn't exist
        if not os.path.exists('data'):
            os.makedirs('data')
            
        # Initialize CSV file
        self.initialize_csv()
        print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Data Recorder initialized")
        
    def initialize_csv(self):
        """Initialize CSV file with headers"""
        filepath = os.path.join('data', self.filename)
        
        # Check if file exists
        file_exists = os.path.isfile(filepath)
        
        # Open file in append mode
        self.csv_file = open(filepath, 'a', newline='')
        self.csv_writer = csv.writer(self.csv_file)
        
        # Write headers only if file is new
        if not file_exists:
            headers = [
                'timestamp', 'round', 'frame',
                # Player 1 state
                'p1_character', 'p1_health', 'p1_x', 'p1_y', 
                'p1_jumping', 'p1_crouching', 'p1_in_move', 'p1_move_id',
                # Player 2 state
                'p2_character', 'p2_health', 'p2_x', 'p2_y',
                'p2_jumping', 'p2_crouching', 'p2_in_move', 'p2_move_id',
                # Game state
                'timer', 'has_round_started', 'is_round_over', 'fight_result',
                # Player 1 buttons
                'p1_up', 'p1_down', 'p1_left', 'p1_right',
                'p1_Y', 'p1_B', 'p1_A', 'p1_X', 'p1_L', 'p1_R',
                # Player 2 buttons
                'p2_up', 'p2_down', 'p2_left', 'p2_right',
                'p2_Y', 'p2_B', 'p2_A', 'p2_X', 'p2_L', 'p2_R'
            ]
            self.csv_writer.writerow(headers)
            logger.info(f"Created new CSV file: {filepath}")
        else:
            logger.info(f"Appending to existing CSV file: {filepath}")
        
    def get_button_name(self, button_value):
        """Convert button value to readable name"""
        return "Pressed" if button_value else "Released"
        
    def print_button_state(self, player_num, button_name, state):
        """Print button state in a readable format"""
        if state:  # Only print when button is pressed
            print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Player {player_num} {button_name}: {self.get_button_name(state)}")
        
    def print_game_state(self, game_state):
        """Print current game state"""
        print(f"\n[{datetime.now().strftime('%H:%M:%S.%f')}] Game State Update:")
        print(f"Round: {self.current_round}, Timer: {game_state.timer}")
        print(f"Round Started: {game_state.has_round_started}, Round Over: {game_state.is_round_over}")
        
        # Player 1 info
        print(f"Player 1 - Character: {game_state.player1.player_id}, Health: {game_state.player1.health}")
        print(f"Position: ({game_state.player1.x_coord}, {game_state.player1.y_coord})")
        print(f"Jumping: {game_state.player1.is_jumping}, Crouching: {game_state.player1.is_crouching}")
        print(f"In Move: {game_state.player1.is_player_in_move}, Move ID: {game_state.player1.move_id}")
        
        # Player 2 info
        print(f"Player 2 - Character: {game_state.player2.player_id}, Health: {game_state.player2.health}")
        print(f"Position: ({game_state.player2.x_coord}, {game_state.player2.y_coord})")
        print(f"Jumping: {game_state.player2.is_jumping}, Crouching: {game_state.player2.is_crouching}")
        print(f"In Move: {game_state.player2.is_player_in_move}, Move ID: {game_state.player2.move_id}")
        
    def record_frame(self, game_state, player1_buttons, player2_buttons):
        """Record a frame of game data and save to CSV"""
        self.frame_count += 1
        current_time = (datetime.now() - self.start_time).total_seconds()
        
        # Update round number when round is over
        if game_state.is_round_over:
            self.current_round += 1
        
        # Print frame count every 60 frames (approximately 1 second)
        if self.frame_count % 60 == 0:
            print(f"\n[{datetime.now().strftime('%H:%M:%S.%f')}] Frame {self.frame_count} - Data still being received")
        
        # Print game state every 180 frames (approximately 3 seconds)
        if self.frame_count % 180 == 0:
            self.print_game_state(game_state)
        
        # Print button states for player 1
        self.print_button_state(1, "Up", player1_buttons.up)
        self.print_button_state(1, "Down", player1_buttons.down)
        self.print_button_state(1, "Left", player1_buttons.left)
        self.print_button_state(1, "Right", player1_buttons.right)
        self.print_button_state(1, "Y (Heavy Punch)", player1_buttons.Y)
        self.print_button_state(1, "B (Medium Punch)", player1_buttons.B)
        self.print_button_state(1, "A (Light Punch)", player1_buttons.A)
        self.print_button_state(1, "X (Heavy Kick)", player1_buttons.X)
        self.print_button_state(1, "L (Medium Kick)", player1_buttons.L)
        self.print_button_state(1, "R (Light Kick)", player1_buttons.R)
        
        # Print button states for player 2
        self.print_button_state(2, "Up", player2_buttons.up)
        self.print_button_state(2, "Down", player2_buttons.down)
        self.print_button_state(2, "Left", player2_buttons.left)
        self.print_button_state(2, "Right", player2_buttons.right)
        self.print_button_state(2, "Y (Heavy Punch)", player2_buttons.Y)
        self.print_button_state(2, "B (Medium Punch)", player2_buttons.B)
        self.print_button_state(2, "A (Light Punch)", player2_buttons.A)
        self.print_button_state(2, "X (Heavy Kick)", player2_buttons.X)
        self.print_button_state(2, "L (Medium Kick)", player2_buttons.L)
        self.print_button_state(2, "R (Light Kick)", player2_buttons.R)
        
        # Get fight result (if available)
        fight_result = getattr(game_state, 'fight_result', 'None')
        
        # Prepare row data with all important information
        row = [
            current_time,
            self.current_round,
            self.frame_count,
            # Player 1 state
            game_state.player1.player_id,
            game_state.player1.health,
            game_state.player1.x_coord,
            game_state.player1.y_coord,
            game_state.player1.is_jumping,
            game_state.player1.is_crouching,
            game_state.player1.is_player_in_move,
            game_state.player1.move_id,
            # Player 2 state
            game_state.player2.player_id,
            game_state.player2.health,
            game_state.player2.x_coord,
            game_state.player2.y_coord,
            game_state.player2.is_jumping,
            game_state.player2.is_crouching,
            game_state.player2.is_player_in_move,
            game_state.player2.move_id,
            # Game state
            game_state.timer,
            game_state.has_round_started,
            game_state.is_round_over,
            fight_result,
            # Player 1 buttons
            player1_buttons.up,
            player1_buttons.down,
            player1_buttons.left,
            player1_buttons.right,
            player1_buttons.Y,
            player1_buttons.B,
            player1_buttons.A,
            player1_buttons.X,
            player1_buttons.L,
            player1_buttons.R,
            # Player 2 buttons
            player2_buttons.up,
            player2_buttons.down,
            player2_buttons.left,
            player2_buttons.right,
            player2_buttons.Y,
            player2_buttons.B,
            player2_buttons.A,
            player2_buttons.X,
            player2_buttons.L,
            player2_buttons.R
        ]
        
        # Write to CSV
        self.csv_writer.writerow(row)
        self.csv_file.flush()  # Ensure data is written to disk
        
        # Also store in memory for potential analysis
        self.records.append(row)
        
    def close(self):
        """Close the CSV file"""
        if self.csv_file:
            self.csv_file.close()
            print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Data Recorder closed. Total frames recorded: {self.frame_count}")
            logger.info(f"Closed CSV file: {self.filename}")
            
    def __del__(self):
        """Ensure file is closed when object is destroyed"""
        self.close() 

<h2><center>DQN File</center></h2>
<div class="dqn-system">
  <h3>Deep Q-Network (DQN) Agent</h3>
  
  <p><strong>Core Architecture:</strong></p>
  <ul>
    <li>Neural network with 3 fully-connected layers (256 → 128 → actions)</li>
    <li>Double DQN implementation (policy + target networks)</li>
    <li>Experience replay buffer for stable training</li>
  </ul>

  <p><strong>Key Components:</strong></p>
  <ul>
    <li><strong>State Processing</strong>:
      <ul>
        <li>17-dimensional input (positions, health, states for both players)</li>
        <li>Automatic tensor conversion from game state</li>
      </ul>
    </li>
    <li><strong>Reward System</strong>:
      <ul>
        <li>Health difference between players (primary reward)</li>
        <li>Distance to opponent (secondary reward)</li>
        <li>Weighted combination for balanced learning</li>
      </ul>
    </li>
    <li><strong>Learning Mechanism</strong>:
      <ul>
        <li>Epsilon-greedy exploration (decaying over time)</li>
        <li>Batch training from experience replay</li>
        <li>Periodic target network updates</li>
      </ul>
    </li>
  </ul>

  <p><strong>Technical Specifications:</strong></p>
  <ul>
    <li>Hyperparameters:
      <ul>
        <li>γ=0.99 (discount factor)</li>
        <li>ε=1.0→0.01 (exploration decay)</li>
        <li>Learning rate=0.001</li>
        <li>Batch size=64</li>
      </ul>
    </li>
    <li>Built with PyTorch</li>
    <li>Full model save/load capability</li>
    <li>Integrated logging</li>
  </ul>

  <p><strong>Training Flow:</strong> Observe → Store → Sample → Train → Update</p>
</div>


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
from logger import logger

class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, output_size)
        
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
        
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
        
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
        
    def __len__(self):
        return len(self.buffer)

class DQNAgent:
    def __init__(self, state_size, action_size, player_number):
        self.state_size = state_size
        self.action_size = action_size
        self.player_number = player_number
        
        # Hyperparameters
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.batch_size = 64
        self.memory = ReplayBuffer(10000)
        
        # Initialize networks
        self.policy_net = DQN(state_size, action_size)
        self.target_net = DQN(state_size, action_size)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.learning_rate)
        
    def get_state(self, game_state):
        """Convert game state to tensor"""
        # Extract relevant features from game state
        if self.player_number == 1:
            player = game_state.player1
            opponent = game_state.player2
        else:
            player = game_state.player2
            opponent = game_state.player1
            
        # Create state array with all relevant features
        state = np.array([
            player.x_coord,
            player.y_coord,
            player.health,
            player.is_jumping,
            player.is_crouching,
            player.is_player_in_move,
            player.move_id,
            opponent.x_coord,
            opponent.y_coord,
            opponent.health,
            opponent.is_jumping,
            opponent.is_crouching,
            opponent.is_player_in_move,
            opponent.move_id,
            game_state.timer,
            game_state.has_round_started,
            game_state.is_round_over
        ], dtype=np.float32)
        
        # Convert to tensor and add batch dimension
        return torch.FloatTensor(state).unsqueeze(0)
        
    def get_reward(self, game_state, next_game_state):
        """Calculate reward based on game state changes"""
        if self.player_number == 1:
            player = game_state.player1
            opponent = game_state.player2
            next_player = next_game_state.player1
            next_opponent = next_game_state.player2
        else:
            player = game_state.player2
            opponent = game_state.player1
            next_player = next_game_state.player2
            next_opponent = next_game_state.player1
            
        # Calculate health difference
        health_diff = (next_opponent.health - opponent.health) - (next_player.health - player.health)
        
        # Calculate distance to opponent
        current_dist = abs(player.x_coord - opponent.x_coord)
        next_dist = abs(next_player.x_coord - next_opponent.x_coord)
        dist_diff = current_dist - next_dist
        
        # Combine rewards
        reward = health_diff * 10 + dist_diff * 0.1
        
        return reward
        
    def select_action(self, state):
        """Select action using epsilon-greedy policy"""
        if random.random() < self.epsilon:
            return random.randrange(self.action_size)
            
        with torch.no_grad():
            return self.policy_net(state).argmax().item()
            
    def train(self):
        """Train the network on a batch of experiences"""
        if len(self.memory) < self.batch_size:
            return
            
        batch = self.memory.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        # Stack states and next_states properly
        states = torch.cat(states)  # Shape: [batch_size, state_size]
        next_states = torch.cat(next_states)  # Shape: [batch_size, state_size]
        
        # Convert other tensors
        actions = torch.tensor(actions, dtype=torch.long)  # Shape: [batch_size]
        rewards = torch.tensor(rewards, dtype=torch.float32)  # Shape: [batch_size]
        dones = torch.tensor(dones, dtype=torch.float32)  # Shape: [batch_size]
        
        # Compute Q(s_t, a)
        current_q_values = self.policy_net(states).gather(1, actions.unsqueeze(1))  # Shape: [batch_size, 1]
        
        # Compute V(s_{t+1})
        with torch.no_grad():
            next_q_values = self.target_net(next_states).max(1)[0]  # Shape: [batch_size]
            target_q_values = rewards + (1 - dones) * self.gamma * next_q_values  # Shape: [batch_size]
            
        # Compute loss and update
        loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # Update epsilon
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        
        return loss.item()
        
    def update_target_network(self):
        """Update target network with policy network weights"""
        self.target_net.load_state_dict(self.policy_net.state_dict())
        
    def save_model(self, path):
        """Save the model to a file"""
        torch.save({
            'policy_net_state_dict': self.policy_net.state_dict(),
            'target_net_state_dict': self.target_net.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epsilon': self.epsilon
        }, path)
        logger.info(f"Saved DQN model to {path}")
        
    def load_model(self, path):
        """Load the model from a file"""
        checkpoint = torch.load(path)
        self.policy_net.load_state_dict(checkpoint['policy_net_state_dict'])
        self.target_net.load_state_dict(checkpoint['target_net_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.epsilon = checkpoint['epsilon']
        logger.info(f"Loaded DQN model from {path}") 


<div class="gamestate-description">
  <h3>GameState Class</h3>
  
  <p><strong>Core Functionality:</strong></p>
  <ul>
    <li>Represents the complete state of a Street Fighter match</li>
    <li>Converts dictionary input into structured game objects</li>
    <li>Tracks match progression and player states</li>
  </ul>

  <p><strong>Key Attributes:</strong></p>
  <ul>
    <li><strong>Player States</strong>:
      <ul>
        <li>player1: All data for Player 1 (position, health, etc.)</li>
        <li>player2: All data for Player 2</li>
      </ul>
    </li>
    <li><strong>Match Information</strong>:
      <ul>
        <li>timer: Current round timer</li>
        <li>fight_result: Match outcome</li>
        <li>has_round_started: Round status flag</li>
        <li>is_round_over: Round completion flag</li>
      </ul>
    </li>
  </ul>

  <p><strong>Details:</strong></p>
  <ul>
    <li>Initializes from JSON/dictionary input</li>
    <li>Creates Player objects for each fighter</li>
    <li>Provides structured access to all game state variables</li>
  </ul>

  <p><strong>Usage:</strong> Serves as the central data structure for game state information throughout the AI system.</p>
</div>


In [None]:
    from player import Player
    
    class GameState:
    
        def __init__(self, input_dict):
    
            self.dict_to_object(input_dict)
    
        def dict_to_object(self, input_dict):
    
            self.player1 = Player(input_dict['p1'])
            self.player2 = Player(input_dict['p2'])
            self.timer = input_dict['timer']
            self.fight_result = input_dict['result']
            self.has_round_started = input_dict['round_started']
            self.is_round_over = input_dict['round_over']

<h2><center>Logger File</center></h2>
<div class="logging-module">
  <h3>Logging System</h3>
  
  <p><strong>Core Functionality:</strong></p>
  <ul>
    <li>Centralized logging system for the Street Fighter AI project</li>
    <li>Dual output (file + console) with configurable levels</li>
    <li>Structured logging for different event types</li>
  </ul>

  <p><strong>Key Features:</strong></p>
  <ul>
    <li><strong>Automatic Setup</strong>:
      <ul>
        <li>Creates 'logs' directory if missing</li>
        <li>Configures from central config file</li>
        <li>Pre-configured logger instance</li>
      </ul>
    </li>
    <li><strong>Specialized Loggers</strong>:
      <ul>
        <li>Game state snapshots</li>
        <li>Bot action tracking</li>
        <li>Error/warning handling</li>
        <li>Debug messages</li>
      </ul>
    </li>
    <li><strong>Configuration</strong>:
      <ul>
        <li>File location and rotation</li>
        <li>Logging level control</li>
        <li>Custom format strings</li>
      </ul>
    </li>
  </ul>
  </ul>
</div>


In [None]:
import logging
import os
from config import LOGGING_CONFIG

def setup_logger():
    """Setup and configure the logger"""
    # Create logs directory if it doesn't exist
    if not os.path.exists('logs'):
        os.makedirs('logs')
        
    # Configure logging
    logging.basicConfig(
        filename=os.path.join('logs', LOGGING_CONFIG['LOG_FILE']),
        level=getattr(logging, LOGGING_CONFIG['LOG_LEVEL']),
        format=LOGGING_CONFIG['LOG_FORMAT']
    )
    
    # Also log to console
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter(LOGGING_CONFIG['LOG_FORMAT'])
    console.setFormatter(formatter)
    logging.getLogger('').addHandler(console)
    
    return logging.getLogger(__name__)

# Create logger instance
logger = setup_logger()

def log_game_state(game_state):
    """Log the current game state"""
    logger.info(f"Game State - Round: {game_state.round}, Timer: {game_state.timer}")
    logger.info(f"Player 1 - Health: {game_state.player1.health}, Position: ({game_state.player1.x_coord}, {game_state.player1.y_coord})")
    logger.info(f"Player 2 - Health: {game_state.player2.health}, Position: ({game_state.player2.x_coord}, {game_state.player2.y_coord})")

def log_bot_action(action, player):
    """Log bot actions"""
    logger.info(f"Bot Action - Player {player}: {action}")

def log_error(error_msg, exc_info=None):
    """Log errors"""
    logger.error(error_msg, exc_info=exc_info)

def log_warning(warning_msg):
    """Log warnings"""
    logger.warning(warning_msg)

def log_debug(debug_msg):
    """Log debug messages"""
    logger.debug(debug_msg) 


<div class="player-class">
  <h3>Player Class</h3>
  
  <p><strong>Core Functionality:</strong></p>
  <ul>
    <li>Represents a fighter's complete in-game state</li>
    <li>Converts dictionary input to object attributes</li>
    <li>Integrates with button input system</li>
  </ul>

  <p><strong>Key Attributes:</strong></p>
  <ul>
    <li><strong>Character Info</strong>:
      <ul>
        <li>player_id: Character identifier</li>
        <li>health: Current health points</li>
      </ul>
    </li>
    <li><strong>Position & State</strong>:
      <ul>
        <li>x_coord/y_coord: Screen position</li>
        <li>is_jumping: Jump state flag</li>
        <li>is_crouching: Crouch state flag</li>
      </ul>
    </li>
    <li><strong>Combat System</strong>:
      <ul>
        <li>player_buttons: Current button inputs (Buttons object)</li>
        <li>is_player_in_move: Move execution flag</li>
        <li>move_id: Active move identifier</li>
      </ul>
    </li>
  </ul>
  </ul>

  <p><strong>Usage:</strong> Central component for tracking and managing player state throughout match execution.</p>
</div>


In [None]:
from buttons import Buttons

class Player:

    def __init__(self, player_dict):
        
        self.dict_to_object(player_dict)
    
    def dict_to_object(self, player_dict):
        
        self.player_id = player_dict['character']
        self.health = player_dict['health']
        self.x_coord = player_dict['x']
        self.y_coord = player_dict['y']
        self.is_jumping = player_dict['jumping']
        self.is_crouching = player_dict['crouching']
        self.player_buttons = Buttons(player_dict['buttons'])
        self.is_player_in_move = player_dict['in_move']
        self.move_id = player_dict['move']
