# SymbolicGym Quickstart Guide

This notebook provides a complete walkthrough of how to use SymbolicGym for training reinforcement learning agents to solve SAT problems. It covers:

1. Setting up the environment
2. Creating and configuring SAT problems
3. Building and training a simple agent
4. Evaluating performance
5. Visualizing results

## Environment Setup

First, let's import the necessary libraries and check the SymbolicGym installation.

In [None]:
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
import random
from collections import deque
import symbolicgym

# Set seeds for reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

# Disable PyTorch JIT compilation to avoid circular import issues
os.environ['PYTORCH_JIT'] = '0'

# Import PyTorch after setting the environment variable
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F

torch.manual_seed(RANDOM_SEED)

# Check SymbolicGym version
print(f"Using SymbolicGym version: {getattr(symbolicgym, '__version__', 'dev')}")

## 1. Creating SAT Problems

SymbolicGym provides utilities for creating or loading SAT problems in CNF (Conjunctive Normal Form). We'll create a simple problem directly and also show how to load from DIMACS files.

In [None]:
# You can also load problems from CNF files
from symbolicgym.utils.cnf import load_cnf_file

In [None]:
# Example CNF file path (update as needed for your project structure)
project_root = os.path.abspath(os.path.join(os.path.dirname("__file__"), ".."))
cnf_file = os.path.join(project_root, "data", "benchmarks",  "uf50-01.cnf")

In [None]:
# Let's create a simple SAT problem
simple_formula = {
    "clauses": [[1, 2, 3], [-1, -2], [2, -3], [1, -3]],
    "num_vars": 3,
    "name": "simple_test_formula"
}

print("Formula structure:")
print(f"Number of variables: {simple_formula['num_vars']}")
print(f"Number of clauses: {len(simple_formula['clauses'])}")
print("Clauses:")
for i, clause in enumerate(simple_formula['clauses']):
    print(f"  Clause {i+1}: {clause}")

# Try to load a CNF file if available
try:
    example_cnf_path = cnf_file
    if os.path.exists(example_cnf_path):
        cnf_data = load_cnf_file(example_cnf_path)
        loaded_formula = {
            "clauses": cnf_data["clauses"],
            "num_vars": cnf_data["num_vars"],
            "name": os.path.basename(example_cnf_path)
        }
        print(f"\nLoaded formula from {example_cnf_path}")
        print(f"Number of variables: {loaded_formula['num_vars']}")
        print(f"Number of clauses: {len(loaded_formula['clauses'])}")
        complex_formula = loaded_formula
    else:
        print("\nExample CNF file not found. Using only the manually created formula.")
        complex_formula = simple_formula
except Exception as e:
    print(f"\nError loading example file: {e}")
    print("Continuing with manually created formula only.")
    complex_formula = simple_formula

## 2. Creating and Configuring the Environment

Now we'll set up the SymbolicGym environment with our SAT problem. We'll explore different configurations including reward functions.

In [None]:
# Create the environment with the simple formula
from symbolicgym.envs.sat import SymbolicSatEnv

In [None]:
# Extract the number of variables from the dictionary
if isinstance(complex_formula['num_vars'], dict) and 'num_variables' in complex_formula['num_vars']:
    num_variables = complex_formula['num_vars']['num_variables']
    corrected_formula = {
        'clauses': complex_formula['clauses'],
        'num_vars': num_variables,
        'name': complex_formula['name']
    }
else:
    corrected_formula = complex_formula

# Create environment with default settings (sparse rewards)
env = SymbolicSatEnv(formula=corrected_formula)

# Print environment information
print(f"Action Space: {env.action_space}")
print(f"Observation Space: {env.observation_space}")

# Reset the environment
obs, info = env.reset(seed=RANDOM_SEED)
print("\nInitial observation:")
print(f"Variables: {obs['variables']}")
print(f"Clauses: {obs['clauses']}")
print(f"Variable Assignment: {obs['variable_assignment']}")
print(f"Clause Satisfaction: {obs['clause_satisfaction']}")
print(f"Initial info: {info}")

# Let's try another environment with dense rewards
env_dense = SymbolicSatEnv(formula=simple_formula, reward_mode="dense")
obs_dense, info_dense = env_dense.reset(seed=RANDOM_SEED)
print("\nEnvironment with dense rewards initialized")

## 3. Interacting with the Environment

Let's interact with the environment manually to understand how it works.

In [None]:
# Take a few random actions
print("Taking random actions:")
env.reset(seed=RANDOM_SEED)

for i in range(5):
    action = env.action_space.sample()
    print(f"\nStep {i+1}, Action: {action} (flipping variable {action+1})")
    obs, reward, terminated, truncated, info = env.step(action)
    print(f"New variable values: {obs['variables']}")
    print(f"Clause satisfaction: {obs['clause_satisfaction']}")
    print(f"Reward: {reward}")
    print(f"Info: {info}")
    if terminated or truncated:
        print("Episode finished!")
        break

# Now let's try with dense rewards
print("\n\nTrying with dense rewards:")
env_dense.reset(seed=RANDOM_SEED)
for i in range(5):
    action = env_dense.action_space.sample()
    print(f"\nStep {i+1}, Action: {action} (flipping variable {action+1})")
    obs, reward, terminated, truncated, info = env_dense.step(action)
    print(f"Reward (dense): {reward}")
    print(f"Clause satisfaction: {obs['clause_satisfaction']}")
    print(f"Info: {info}")
    if terminated or truncated:
        print("Episode finished!")
        break

## 4. Building a Simple DQN Agent

Now let's build a simple Deep Q-Network agent to solve our SAT problem. This is a basic implementation to demonstrate how RL can be applied to SAT problems.

In [None]:
class DQN(nn.Module):
    def __init__(self, input_size, output_size, hidden_size=64):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

class DQNAgent:
    def __init__(self, state_size, action_size, hidden_size=64, lr=0.001, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995, buffer_size=10000, batch_size=64):
        self.state_size = state_size
        self.action_size = action_size
        self.hidden_size = hidden_size
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        self.policy_net = DQN(state_size, action_size, hidden_size).to(self.device)
        self.target_net = DQN(state_size, action_size, hidden_size).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.memory = ReplayBuffer(buffer_size)
        
    def select_action(self, state):
        if random.random() > self.epsilon:
            with torch.no_grad():
                state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
                q_values = self.policy_net(state_tensor)
                return q_values.max(1)[1].item()
        else:
            return random.randrange(self.action_size)
    
    def update_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def learn(self):
        if len(self.memory) < self.batch_size:
            return
        
        transitions = self.memory.sample(self.batch_size)
        batch = list(zip(*transitions))
        
        # Convert lists to numpy arrays first for better performance
        states = np.array(batch[0])
        actions = np.array(batch[1])
        rewards = np.array(batch[2])
        next_states = np.array(batch[3])
        dones = np.array(batch[4])
        
        # Then convert numpy arrays to tensors
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).view(-1, 1).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)
        
        current_q = self.policy_net(states).gather(1, actions)
        
        next_q = self.target_net(next_states).max(1)[0].detach()
        expected_q = rewards + self.gamma * next_q * (1 - dones)
        
        loss = F.smooth_l1_loss(current_q, expected_q.unsqueeze(1))
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def update_target_net(self):
        self.target_net.load_state_dict(self.policy_net.state_dict())

## 5. Training the Agent

Now we'll train our DQN agent on the SAT problem. We'll use a simple preprocessing function to convert our environment observations into a format suitable for our neural network.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from collections import deque
import random

def preprocess_state(observation):
    # Extract and flatten the variables and clauses arrays
    variables = observation['variables']
    clauses = observation['clauses']
    
    # Concatenate them into a single state vector
    state = np.concatenate([variables, clauses])
    return state

# Training parameters
num_episodes = 100
target_update = 10
max_steps = 100
env = SymbolicSatEnv(formula=corrected_formula, reward_mode="dense", max_steps=max_steps)

# Get dimensions for our agent
obs, _ = env.reset()
state = preprocess_state(obs)
state_size = len(state)
action_size = env.action_space.n

# Create our agent
agent = DQNAgent(
    state_size=state_size,
    action_size=action_size,
    hidden_size=64,
    lr=0.001,
    gamma=0.99,
    epsilon=1.0,
    epsilon_min=0.05,
    epsilon_decay=0.995,
    buffer_size=10000,
    batch_size=64
)

# Training loop
rewards_history = []
solved_episodes = 0
clause_satisfaction_history = []

for episode in range(num_episodes):
    obs, _ = env.reset()
    state = preprocess_state(obs)
    
    total_reward = 0
    max_satisfaction = 0
    
    for step in range(max_steps):
        # Select and perform action
        action = agent.select_action(state)
        next_obs, reward, terminated, truncated, info = env.step(action)
        next_state = preprocess_state(next_obs)
        
        # Store transition
        agent.memory.push(state, action, reward, next_state, terminated or truncated)
        
        # Move to the next state
        state = next_state
        total_reward += reward
        
        # Track maximum clause satisfaction
        current_satisfaction = info['satisfaction_ratio']
        max_satisfaction = max(max_satisfaction, current_satisfaction)
        
        # Learn
        loss = agent.learn()
        
        # If episode is done, break
        if terminated or truncated:
            if info.get('solved', False):
                solved_episodes += 1
            break
    
    # Update target network
    if episode % target_update == 0:
        agent.update_target_net()
    
    # Update epsilon for exploration
    agent.update_epsilon()
    
    # Save metrics
    rewards_history.append(total_reward)
    clause_satisfaction_history.append(max_satisfaction)
    
    # Print progress
    if (episode + 1) % 10 == 0:
        print(f"Episode {episode + 1}/{num_episodes}, "
              f"Total Reward: {total_reward:.2f}, "
              f"Max Satisfaction: {max_satisfaction:.2f}, "
              f"Epsilon: {agent.epsilon:.2f}, "
              f"Solved Episodes: {solved_episodes}")

print(f"\nTraining completed. Solved {solved_episodes}/{num_episodes} episodes.")

## 6. Evaluating Agent Performance

Let's evaluate the trained agent and see how well it performs.

In [None]:
# Set agent to evaluation mode (no exploration)
agent.epsilon = 0

# Evaluation parameters
num_eval_episodes = 20
max_eval_steps = 100

# Metrics
eval_rewards = []
eval_solved = 0
eval_steps = []
eval_satisfaction_ratios = []

# Run evaluation episodes
for episode in range(num_eval_episodes):
    obs, _ = env.reset()
    state = preprocess_state(obs)
    
    total_reward = 0
    steps_taken = 0
    max_satisfaction = 0
    
    for step in range(max_eval_steps):
        steps_taken += 1
        
        # Select action without exploration
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(agent.device)
            q_values = agent.policy_net(state_tensor)
            action = q_values.max(1)[1].item()
        
        # Take action
        next_obs, reward, terminated, truncated, info = env.step(action)
        next_state = preprocess_state(next_obs)
        
        # Update state and metrics
        state = next_state
        total_reward += reward
        current_satisfaction = info['satisfaction_ratio']
        max_satisfaction = max(max_satisfaction, current_satisfaction)
        
        # Check if episode is done
        if terminated or truncated:
            if info.get('solved', False):
                eval_solved += 1
            break
    
    # Save metrics
    eval_rewards.append(total_reward)
    eval_steps.append(steps_taken)
    eval_satisfaction_ratios.append(max_satisfaction)
    
    print(f"Evaluation Episode {episode + 1}: "
          f"Reward: {total_reward:.2f}, "
          f"Steps: {steps_taken}, "
          f"Max Satisfaction: {max_satisfaction:.2f}, "
          f"Solved: {info.get('solved', False)}")

print(f"\nEvaluation completed. "
      f"Solved: {eval_solved}/{num_eval_episodes} episodes. "
      f"Average Reward: {np.mean(eval_rewards):.2f}, "
      f"Average Steps: {np.mean(eval_steps):.2f}, "
      f"Average Satisfaction: {np.mean(eval_satisfaction_ratios):.2f}")

## 7. Visualizing the Results

Finally, let's visualize our training and evaluation results.

In [None]:
plt.figure(figsize=(15, 10))

# Plot training rewards
plt.subplot(2, 2, 1)
plt.plot(rewards_history, label='Training Rewards')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Training Rewards per Episode')
plt.grid(True)

# Plot clause satisfaction during training
plt.subplot(2, 2, 2)
plt.plot(clause_satisfaction_history, label='Max Clause Satisfaction', color='orange')
plt.xlabel('Episode')
plt.ylabel('Satisfaction Ratio')
plt.title('Max Clause Satisfaction per Episode')
plt.grid(True)

# Plot evaluation rewards
plt.subplot(2, 2, 3)
plt.bar(range(num_eval_episodes), eval_rewards, color='green')
plt.xlabel('Evaluation Episode')
plt.ylabel('Total Reward')
plt.title('Evaluation Rewards per Episode')
plt.grid(True)

# Plot steps to solve during evaluation
plt.subplot(2, 2, 4)
plt.bar(range(num_eval_episodes), eval_satisfaction_ratios, color='purple')
plt.xlabel('Evaluation Episode')
plt.ylabel('Max Satisfaction Ratio')
plt.title('Evaluation Satisfaction Ratios')
plt.grid(True)

plt.tight_layout()
plt.show()

## 8. Advanced Features: Using Oracle Guidance

SymbolicGym allows integration with traditional SAT solver oracles to guide the learning process. Let's see how to leverage this feature.

In [None]:
# Import oracle components
from symbolicgym.oracles.simple_oracle import SimpleDPLLOracle
from symbolicgym.oracles.oracle_protocol import OracleQuery

# Create a simple oracle
try:
    oracle = SimpleDPLLOracle(
        clauses=simple_formula["clauses"],
        num_vars=simple_formula["num_vars"]
    )
    
    # Reset the environment
    obs, info = env.reset()
    
    # Create an oracle query
    query = OracleQuery(
        query_type="next_variable",
        current_assignment=obs["variable_assignment"],
        options={}
    )
    
    # Get oracle guidance
    oracle_response = oracle.query(query)
    
    print("\nOracle Guidance:")
    print(f"Response: {oracle_response}")

except Exception as e:
    print(f"Error using oracle: {e}")
    print("This section requires proper oracle implementation.")

## 9. Conclusion

In this tutorial, we've explored:

1. Creating and configuring SAT environments in SymbolicGym
2. Implementing a DQN agent for solving SAT problems
3. Training and evaluating the agent
4. Visualizing training and evaluation metrics
5. Using oracle guidance for enhanced learning

This is just a starting point for using reinforcement learning to tackle SAT problems. More advanced techniques like graph neural networks, MCTS, and specialized reward shaping can significantly improve performance.

For more information, check out the SymbolicGym documentation or refer to the source code repository.