# Debug Grouped Environment Features

This notebook helps understand how `train_lin_grouped.py` initializes the environment and computes features like holes, bumpiness, and heights.

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt

from tetris_gymnasium.envs import Tetris
from tetris_gymnasium.wrappers.grouped import GroupedActionsObservations
from tetris_gymnasium.wrappers.observation import FeatureVectorObservation

## 1. Create Environment (same as train_lin_grouped.py)

In [None]:
# Create env the same way as train_lin_grouped.py
env = gym.make("tetris_gymnasium/Tetris", render_mode="rgb_array", gravity=False)
env = GroupedActionsObservations(
    env, observation_wrappers=[FeatureVectorObservation(env)]
)
env = gym.wrappers.RecordEpisodeStatistics(env)

print(f"Observation space: {env.observation_space}")
print(f"Action space: {env.action_space}")

## 2. Reset and Inspect Initial State

In [None]:
obs, info = env.reset(seed=42)

print("=" * 50)
print("OBSERVATION (obs)")
print("=" * 50)
print(f"Type: {type(obs)}")
print(f"Shape: {obs.shape if hasattr(obs, 'shape') else 'N/A'}")
print(f"Dtype: {obs.dtype if hasattr(obs, 'dtype') else 'N/A'}")
print(f"\nFirst 3 rows of obs:\n{obs[:3] if hasattr(obs, '__getitem__') else obs}")

In [None]:
print("=" * 50)
print("INFO dict keys")
print("=" * 50)
print(f"Keys: {info.keys()}")

for key, value in info.items():
    print(f"\n--- {key} ---")
    print(f"Type: {type(value)}")
    if hasattr(value, 'shape'):
        print(f"Shape: {value.shape}")
    elif isinstance(value, (list, tuple)):
        print(f"Length: {len(value)}")
        if len(value) > 0:
            print(f"First element type: {type(value[0])}")
            if hasattr(value[0], 'shape'):
                print(f"First element shape: {value[0].shape}")
    else:
        print(f"Value: {value}")

## 3. Inspect the Board

In [None]:
# Get the board from info
if "board" in info:
    board = info["board"]
    print(f"Board type: {type(board)}")
    
    # If it's a list (from vectorized env), get first element
    if isinstance(board, (list, tuple)):
        print(f"Board is a list with {len(board)} elements")
        board = board[0] if len(board) > 0 else board
        print(f"First board type: {type(board)}")
    
    # If it's a dict, extract 'board' key
    if isinstance(board, dict):
        print(f"Board is a dict with keys: {board.keys()}")
        if 'board' in board:
            board = board['board']
    
    board = np.asarray(board)
    print(f"\nFinal board shape: {board.shape}")
    print(f"Board dtype: {board.dtype}")
    print(f"\nUnique values in board: {np.unique(board)}")
    print(f"\nBoard preview (first 5 rows):\n{board[:5]}")
else:
    print("'board' not found in info!")
    print(f"Available keys: {info.keys()}")

In [None]:
# Visualize the board
if 'board' in dir():
    plt.figure(figsize=(6, 12))
    plt.imshow(board, cmap='tab10', vmin=0, vmax=8)
    plt.colorbar(label='Cell value')
    plt.title('Board State\n(0=empty, 1=wall, 2-8=pieces)')
    plt.xlabel('Column')
    plt.ylabel('Row')
    plt.show()

## 4. Test Feature Calculations

In [None]:
# Feature calculation functions (same as train_lin_grouped.py)

def _get_board_array(board) -> np.ndarray:
    """Extract the board array from various possible formats."""
    if isinstance(board, dict):
        return np.asarray(board.get("board", board))
    return np.asarray(board)


def get_column_heights(board) -> list:
    """Compute column heights."""
    board = _get_board_array(board)
    if board.ndim != 2:
        print(f"WARNING: board.ndim = {board.ndim}, expected 2")
        return []
    
    heights_column = []
    for col in range(board.shape[1]):
        if board[0, col] != 1:  # Skip wall columns
            row = 2
            while row < board.shape[0] and board[row, col] == 0:
                row += 1
            heights_column.append(board.shape[0] - row)
    return heights_column


def count_holes(board) -> int:
    """Count holes in the board."""
    board = _get_board_array(board)
    if board.ndim != 2:
        print(f"WARNING: board.ndim = {board.ndim}, expected 2")
        return 0
    
    nb_holes = 0
    for row in range(board.shape[0]):
        for col in range(board.shape[1]):
            if row > 1 and board[row, col] == 0 and board[row-1, col] != 0:
                nb_holes += 1
    return nb_holes


def compute_bumpiness(board) -> int:
    """Compute bumpiness."""
    heights = get_column_heights(board)
    if len(heights) < 2:
        return 0
    bumpiness = 0
    for i in range(len(heights) - 1):
        bumpiness += abs(heights[i] - heights[i+1])
    return bumpiness

In [None]:
# Test on the board we extracted
print("Testing feature calculations on board...")
print(f"\nBoard shape: {board.shape}")
print(f"Board ndim: {board.ndim}")

heights = get_column_heights(board)
holes = count_holes(board)
bumpiness = compute_bumpiness(board)

print(f"\n--- Results ---")
print(f"Column heights: {heights}")
print(f"Number of heights: {len(heights)}")
print(f"Holes: {holes}")
print(f"Bumpiness: {bumpiness}")

## 5. Take Some Steps and Check Features

In [None]:
# Get action mask
action_mask = info.get("action_mask", None)
print(f"Action mask type: {type(action_mask)}")
if action_mask is not None:
    if isinstance(action_mask, (list, tuple)):
        action_mask = action_mask[0] if len(action_mask) > 0 else action_mask
    action_mask = np.asarray(action_mask)
    print(f"Action mask shape: {action_mask.shape}")
    print(f"Valid actions: {np.sum(action_mask == 1)} / {len(action_mask)}")

In [None]:
# Take a random valid action
valid_actions = np.where(action_mask == 1)[0]
if len(valid_actions) > 0:
    action = np.random.choice(valid_actions)
    print(f"Taking action: {action}")
    
    next_obs, reward, terminated, truncated, next_info = env.step(action)
    
    print(f"\nReward: {reward}")
    print(f"Terminated: {terminated}")
    print(f"Lines cleared: {next_info.get('lines_cleared', 'N/A')}")
    
    # Get next board
    if "board" in next_info:
        next_board = next_info["board"]
        if isinstance(next_board, (list, tuple)):
            next_board = next_board[0]
        if isinstance(next_board, dict):
            next_board = next_board.get('board', next_board)
        next_board = np.asarray(next_board)
        
        print(f"\nNext board shape: {next_board.shape}")
        
        # Compute features
        next_heights = get_column_heights(next_board)
        next_holes = count_holes(next_board)
        next_bumpiness = compute_bumpiness(next_board)
        
        print(f"\n--- Next State Features ---")
        print(f"Column heights: {next_heights}")
        print(f"Holes: {next_holes}")
        print(f"Bumpiness: {next_bumpiness}")

## 6. Compare with policies.py Implementation

In [None]:
# Original policies.py functions
def heights_original(board):
    """Original heights function from policies.py"""
    heights_column = [] 
    for i in range(board.shape[1]):
        if (board[0,i] != 1):
            j = 2
            while (board[j,i] == 0) and (j < board.shape[0]): 
                j = j+1
            heights_column.append(j) 
    return heights_column

def holes_original(board):
    """Original holes function from policies.py"""
    nb_holes = 0
    for i in range(board.shape[0]):
        for j in range(board.shape[1]):
            if (i > 1) and (board[i,j] == 0) and (board[i-1,j] != 0): 
                nb_holes = nb_holes + 1
    return nb_holes

# Compare
print("Comparing implementations on next_board...")
if 'next_board' in dir():
    print(f"\nOriginal heights(): {heights_original(next_board)}")
    print(f"Our get_column_heights(): {get_column_heights(next_board)}")
    print(f"\nOriginal holes(): {holes_original(next_board)}")
    print(f"Our count_holes(): {count_holes(next_board)}")

## 7. Play Multiple Steps and Visualize

In [None]:
# Reset and play a few steps
obs, info = env.reset(seed=123)

boards_history = []
features_history = []

for step in range(10):
    # Get action mask
    action_mask = info.get("action_mask", np.ones(obs.shape[0]))
    if isinstance(action_mask, (list, tuple)):
        action_mask = action_mask[0]
    action_mask = np.asarray(action_mask)
    
    valid_actions = np.where(action_mask == 1)[0]
    if len(valid_actions) == 0:
        print(f"Step {step}: No valid actions, game over")
        break
    
    action = np.random.choice(valid_actions)
    obs, reward, terminated, truncated, info = env.step(action)
    
    # Get board
    board = info.get("board", None)
    if board is not None:
        if isinstance(board, (list, tuple)):
            board = board[0]
        if isinstance(board, dict):
            board = board.get('board', board)
        board = np.asarray(board)
        
        boards_history.append(board.copy())
        features_history.append({
            'step': step,
            'heights': get_column_heights(board),
            'holes': count_holes(board),
            'bumpiness': compute_bumpiness(board),
            'reward': reward,
            'lines': info.get('lines_cleared', 0)
        })
    
    if terminated:
        print(f"Step {step}: Game terminated")
        break

print(f"\nPlayed {len(features_history)} steps")

In [None]:
# Show features over time
import pandas as pd

if features_history:
    df = pd.DataFrame(features_history)
    print(df.to_string())

In [None]:
# Visualize last board
if boards_history:
    last_board = boards_history[-1]
    plt.figure(figsize=(6, 12))
    plt.imshow(last_board, cmap='tab10', vmin=0, vmax=8)
    plt.colorbar(label='Cell value')
    plt.title(f'Board after {len(boards_history)} steps')
    plt.xlabel('Column')
    plt.ylabel('Row')
    plt.show()
    
    print(f"\nFeatures:")
    print(f"  Heights: {get_column_heights(last_board)}")
    print(f"  Holes: {count_holes(last_board)}")
    print(f"  Bumpiness: {compute_bumpiness(last_board)}")

In [None]:
env.close()