In [3]:
import vizdoom as vzd
import os
import time
import random
import json
import numpy as np
import cv2
from datetime import datetime
from pynput import keyboard


In [4]:
selected_map = "map01"

In [5]:

class DoomDataCollectorSimple:
    def __init__(self, data_dir="my_gameplay_data"):
        self.data_dir = data_dir
        self.game = None
        self.episode_data = []
        self.episode_count = 0
        self.frame_count = 0
        
        # Simplified keyboard state tracking - only 3 controls
        self.key_states = {
            'left': False,    # Move left
            'right': False,   # Move right
            'ctrl': False     # Attack
        }
        
        # Create data directories
        os.makedirs(self.data_dir, exist_ok=True)
        os.makedirs(os.path.join(self.data_dir, "frames"), exist_ok=True)
        os.makedirs(os.path.join(self.data_dir, "episodes"), exist_ok=True)
        
        # Setup keyboard listener
        self.keyboard_listener = None
        
    def setup_game(self):
        """Initialize VizDoom for manual play with simplified controls"""
        self.game = vzd.DoomGame()
        
        # Find scenarios directory
        scenarios_path = os.path.join(os.path.dirname(vzd.__file__), 'scenarios')
        basic_config_path = os.path.join(scenarios_path, 'basic.cfg')
        
        
        print("Config file not found, setting up manual configuration...")
        self._setup_manual_config(scenarios_path)
        
        # Configure for manual play
        self.game.set_window_visible(True)
        self.game.set_sound_enabled(True)
        self.game.set_render_hud(True)
        self.game.set_render_crosshair(True)
        self.game.set_render_weapon(True)
        
        # Use ASYNC_SPECTATOR mode for better input handling
        self.game.set_mode(vzd.Mode.SPECTATOR)
        
        # Initialize the game
        self.game.init()
        
        print("Game initialized for manual play with SIMPLIFIED controls!")
        print("Controls:")
        print("  ←: Move left")
        print("  →: Move right")
        print("  CTRL: Attack/Shoot")
        print("  ESC: Exit game")
        print("\nGame window should be open. Click on it and start playing!")
        print("\nNOTE: If you see 'not trusted' error, add your terminal to macOS Accessibility settings!")
        
        return self.game
    
    def _setup_manual_config(self, scenarios_path):
        """Setup manual configuration if config file not found"""
        basic_wad_path = os.path.join(scenarios_path, 'basic.wad')
        self.game.set_doom_scenario_path(basic_wad_path)
        self.game.set_doom_map(selected_map)
        
        # Configure screen
        self.game.set_screen_resolution(vzd.ScreenResolution.RES_640X480)
        self.game.set_screen_format(vzd.ScreenFormat.RGB24)
        
        # Add only the 3 simplified buttons
        self.game.add_available_button(vzd.Button.MOVE_LEFT)       # Left arrow
        self.game.add_available_button(vzd.Button.MOVE_RIGHT)      # Right arrow
        self.game.add_available_button(vzd.Button.ATTACK)          # Ctrl
        # self.game.add_available_button(vzd.Button.USE)             # Space
        # self.game.add_available_button(vzd.Button.MOVE_FORWARD)     
        # self.game.add_available_button(vzd.Button.MOVE_BACKWARD)  
                # Set game parameters
        self.game.set_episode_timeout(2000)
        self.game.set_living_reward(-1)
    
    def _on_key_press(self, key):
        """Handle key press events - simplified to 3 keys"""
        try:
            if key == keyboard.Key.left:
                self.key_states['left'] = True
            elif key == keyboard.Key.right:
                self.key_states['right'] = True
            elif key == keyboard.Key.ctrl_l or key == keyboard.Key.ctrl_r:
                self.key_states['ctrl'] = True
        except AttributeError:
            pass
    
    def _on_key_release(self, key):
        """Handle key release events - simplified to 3 keys"""
        try:
            if key == keyboard.Key.left:
                self.key_states['left'] = False
            elif key == keyboard.Key.right:
                self.key_states['right'] = False
            elif key == keyboard.Key.ctrl_l or key == keyboard.Key.ctrl_r:
                self.key_states['ctrl'] = False
            elif key == keyboard.Key.esc:
                # Stop recording and exit
                return False
        except AttributeError:
            pass
    
    def _key_states_to_action(self):
        """Convert key states to VizDoom action vector - simplified to 3 actions"""
        # Map key states to button indices (matching the order we added buttons)
        action = [
            self.key_states['left'],    # MOVE_LEFT
            self.key_states['right'],   # MOVE_RIGHT
            self.key_states['ctrl']     # ATTACK
        ]
        return action
    
    def start_keyboard_listener(self):
        """Start listening for keyboard input"""
        try:
            self.keyboard_listener = keyboard.Listener(
                on_press=self._on_key_press,
                on_release=self._on_key_release
            )
            self.keyboard_listener.start()
            print("Keyboard listener started!")
            return True
        except Exception as e:
            print(f"Failed to start keyboard listener: {e}")
            print("You may need to add your terminal to macOS Accessibility settings")
            return False
    
    def stop_keyboard_listener(self):
        """Stop listening for keyboard input"""
        if self.keyboard_listener:
            self.keyboard_listener.stop()
            print("Keyboard listener stopped!")
    
    def _save_frame_safely(self, screen_buffer, frame_path):
        """Safely save frame with proper format handling"""
        try:
            # Check image format and convert if necessary
            if len(screen_buffer.shape) == 3:
                if screen_buffer.shape[0] == 3:
                    # CHW format (Channels, Height, Width) - convert to HWC
                    # Transpose from (3, 240, 320) to (240, 320, 3)
                    screen_buffer_hwc = np.transpose(screen_buffer, (1, 2, 0))
                    cv2.imwrite(frame_path, screen_buffer_hwc)
                    return True
                elif screen_buffer.shape[2] == 3:
                    # HWC format (Height, Width, Channels) - should work directly
                    cv2.imwrite(frame_path, screen_buffer)
                    return True
                else:
                    print(f"Unexpected image format: {screen_buffer.shape}")
                    return False
            else:
                print(f"Unexpected image format: {screen_buffer.shape}")
                return False
        except Exception as e:
            print(f"Error saving frame: {e}")
            return False
    
    def record_episode(self, max_frames=5000):
        """Record one episode of manual gameplay"""
        print(f"\n=== Starting Episode {self.episode_count + 1} (SIMPLIFIED CONTROLS) ===")
        print("Play manually! Data is being recorded...")
        print("Controls: ← (left), → (right), CTRL (attack), ESC (exit)")
        
        # Generate timestamp for this episode (used for both frames and JSON)
        self.episode_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        # Start keyboard listener
        if not self.start_keyboard_listener():
            print("Continuing without keyboard input capture...")
        
        # Reset episode data
        self.episode_data = []
        self.frame_count = 0
        
        # Start new episode
        self.game.new_episode()
        
        try:
            while self.frame_count < max_frames:
                # Get current state
                state = self.game.get_state()
                
                if state:
                    # Capture frame data
                    screen_buffer = state.screen_buffer
                    game_variables = state.game_variables if state.game_variables else []
                    
                    # Get current action from keyboard input
                    action = self._key_states_to_action()
                    
                    # Record frame data
                    frame_data = {
                        'frame': self.frame_count,
                        'timestamp': time.time(),
                        'screen_shape': screen_buffer.shape,
                        'action': action,
                        'action_names': ['move_left', 'move_right', 'attack'],  # For clarity
                        'game_variables': game_variables.tolist() if len(game_variables) > 0 else [],
                        'episode': self.episode_count
                    }
                    
                    # Save frame image with error handling
                    frame_filename = f"ep{self.episode_count:03d}_frame{self.frame_count:06d}_{self.episode_timestamp}.png"
                    frame_path = os.path.join(self.data_dir, "frames", frame_filename)
                    
                    if self._save_frame_safely(screen_buffer, frame_path):
                        frame_data['frame_path'] = frame_path
                    else:
                        frame_data['frame_path'] = None
                    
                    self.episode_data.append(frame_data)
                    
                    # Make action in the game
                    reward = self.game.make_action(action)
                    frame_data['reward'] = reward
                    
                    self.frame_count += 1
                    
                    # Print progress every 100 frames
                    if self.frame_count % 100 == 0:
                        print(f"Frame {self.frame_count} - Recording...")
                else:
                    print("No state available, starting new episode...")
                    self.game.new_episode()
                    
        except KeyboardInterrupt:
            print("\nEpisode interrupted by user")
        
        # Stop keyboard listener
        self.stop_keyboard_listener()
        
        # Save episode data
        if self.episode_data:
            self._save_episode_data()
        
        print(f"Episode {self.episode_count + 1} finished! Total frames: {self.frame_count}")
        self.episode_count += 1
        
        return self.frame_count
    
    def _save_episode_data(self):
        """Save episode data to JSON file"""
        episode_filename = f"episode_simple_{self.episode_count:03d}_{self.episode_timestamp}.json"
        episode_path = os.path.join(self.data_dir, "episodes", episode_filename)
        
        with open(episode_path, 'w') as f:
            json.dump(self.episode_data, f, indent=2)
        
        print(f"Episode data saved to: {episode_path}")
        print(f"Total frames recorded: {len(self.episode_data)}")
    
    def close(self):
        """Close the game and cleanup"""
        self.stop_keyboard_listener()
        if self.game:
            self.game.close()
            print("Game closed successfully.")


In [7]:
try:
    # Create data collector 
    collector = DoomDataCollectorSimple(data_dir="my_gameplay_data")
    
    # Setup the game
    collector.setup_game()
    
    # Record a short episode to test
    collector.record_episode(max_frames=10000)
    
except Exception as e:
    print(f"Error: {e}")
finally:
    collector.close()


Config file not found, setting up manual configuration...


This process is not trusted! Input event monitoring will not be possible until it is added to accessibility clients.


Game initialized for manual play with SIMPLIFIED controls!
Controls:
  ←: Move left
  →: Move right
  CTRL: Attack/Shoot
  ESC: Exit game

Game window should be open. Click on it and start playing!

NOTE: If you see 'not trusted' error, add your terminal to macOS Accessibility settings!

=== Starting Episode 1 (SIMPLIFIED CONTROLS) ===
Play manually! Data is being recorded...
Controls: ← (left), → (right), CTRL (attack), ESC (exit)
Keyboard listener started!
No state available, starting new episode...
No state available, starting new episode...
Frame 100 - Recording...
No state available, starting new episode...
No state available, starting new episode...
Frame 200 - Recording...
No state available, starting new episode...
No state available, starting new episode...
No state available, starting new episode...
Frame 300 - Recording...
No state available, starting new episode...
No state available, starting new episode...
Frame 400 - Recording...
No state available, starting new episode.