In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import time
from utils import State, Action  # Assuming this is available from your project files

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, 
                             stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                             stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1,
                         stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [None]:
def state_to_features(state):
    # Basic features from your original function
    board = state.board.flatten()  # 81 elements
    board = np.where(board == 2, -1, board)  # 0→0, 1→1, 2→-1
    local_status = state.local_board_status.flatten()  # 9 elements
    local_status = np.where(local_status == 2, -1, 
                           np.where(local_status == 3, 0, local_status))  # 0→0, 1→1, 2→-1, 3→0
    fill_num = [1 if state.fill_num == 1 else -1]  # 1 element
    prev_action = np.array(state.prev_local_action) if state.prev_local_action is not None else np.array([-1, -1])  # 2 elements
    
    # Additional features
    # 1. Count of pieces per player
    player1_count = np.sum(board == 1)  # 1 element
    player2_count = np.sum(board == -1)  # 1 element
    
    # 2. Local board completion status (fraction of filled cells per local board)
    board_3d = state.board.reshape(3, 3, 9)  # Reshape to (meta_row, meta_col, local_cells)
    local_filled = np.count_nonzero(board_3d, axis=2).flatten() / 9.0  # 9 elements
    
    # 3. Threat detection (simplified: count of 2-in-a-rows possible)
    def count_twos(board_2d):
        count = 0
        for row in board_2d:
            if np.sum(row == 1) == 2 and np.sum(row == 0) == 1: count += 1
            if np.sum(row == -1) == 2 and np.sum(row == 0) == 1: count += 1
        return count
    
    board_2d = state.board.reshape(9, 9)  # Treat as 9 local boards
    threat_count = count_twos(board_2d) + count_twos(board_2d.T)  # Rows and columns, 1 element
    
    # Concatenate all features
    features = np.concatenate([
        board,           # 81
        local_status,    # 9
        fill_num,        # 1
        prev_action,     # 2
        [player1_count], # 1
        [player2_count], # 1
        local_filled,    # 9
        [threat_count]   # 1
    ])
    return features

In [None]:
from sklearn.model_selection import train_test_split
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
from utils import load_data

data = load_data()
X = np.array([state_to_features(state) for state, _ in data])
y = np.array([value for _, value in data]).reshape(-1, 1)

# Normalize features
scaler = StandardScaler()
X_normalized = scaler.fit_transform(X)

# Split data
X_train, X_temp, y_train, y_temp = train_test_split(X_normalized, y, test_size=0.15, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32)

# Create DataLoader
batch_size = 128
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)



In [None]:
class ResNetUTTT(nn.Module):
    def __init__(self, num_blocks=2, num_features=64):
        super(ResNetUTTT, self).__init__()
        
        # Input processing for flat features (105 elements)
        self.initial_fc = nn.Linear(105, num_features * 9)  # Transform to 576 (e.g., 64*9)
        self.bn1 = nn.BatchNorm2d(num_features)
        
        # Reshape to 2D for convolutions (num_features channels, 3x3)
        self.layer1 = self._make_layer(num_features, num_features, num_blocks)
        self.layer2 = self._make_layer(num_features, num_features*2, num_blocks, stride=2)
        
        # Final layers
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc1 = nn.Linear(num_features*2, 128)
        self.fc2 = nn.Linear(128, 1)
        
    def _make_layer(self, in_channels, out_channels, num_blocks, stride=1):
        layers = []
        layers.append(ResidualBlock(in_channels, out_channels, stride))
        for _ in range(1, num_blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        # Input shape: (batch_size, 105)
        batch_size = x.size(0)
        
        # Transform flat features to 2D
        out = self.initial_fc(x)  # (batch_size, num_features * 9)
        out = out.view(batch_size, -1, 3, 3)  # (batch_size, num_features, 3, 3)
        out = F.relu(self.bn1(out))
        
        # Residual layers
        out = self.layer1(out)
        out = self.layer2(out)
        
        # Final processing
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = F.relu(self.fc1(out))
        out = torch.tanh(self.fc2(out))  # Output between -1 and 1
        return out

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "CPU")
print(device)
model = ResNetUTTT().to(device)

In [None]:
from torch.optim.lr_scheduler import ReduceLROnPlateau





criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1).to(device)

for epoch in range(200):



        optimizer.zero_grad()
        outputs = model(X_train_tensor)
        loss = criterion(outputs, y_train_tensor)
        loss.backward()
        optimizer.step()
        print(f"Epoch {epoch+1}, Loss: {loss.item()}")
        # Update scheduler based on epoch loss (for ReduceLROnPlateau)
        scheduler.step(loss.item())
print(f"Epoch {epoch+1}, Loss: {loss.item()}")

In [None]:
from sklearn.metrics import mean_squared_error, r2_score


criterion = nn.HuberLoss(delta=0.5)
X_temp_tensor = torch.tensor(X_temp, dtype=torch.float32).to(device)
y_temp_tensor = torch.tensor(y_temp, dtype=torch.float32).view(-1, 1).to(device)
        # Set model to evaluation mode
#model.load_state_dict(torch.load('best_model.pt'))

model.eval()
    
    # Disable gradient calculation for inference
with torch.no_grad():
    # Get predictions
    temp_outputs = model(X_temp_tensor)
    # Calculate temporary data loss
    temp_loss = criterion(temp_outputs, y_temp_tensor).to(device)
        
    # Convert predictions to numpy for easier analysis
    #torch.tensor.cpu()
    y_temp_pred = temp_outputs.detach().cpu().numpy()
    
# Calculate additional metrics
temp_mse = mean_squared_error(y_temp, y_temp_pred)
temp_r2 = r2_score(y_temp, y_temp_pred)
    
# Print evaluation metrics
print(f"Temporary Data Loss: {temp_loss.item()}")
print(f"Temporary Data MSE: {temp_mse}")
print(f"Temporary Data R²: {temp_r2}")
    
print( temp_loss.item(), temp_mse, temp_r2)

In [None]:
torch.set_printoptions(threshold=float('inf'), precision=10, linewidth=10000)

# Then print in OrderedDict format
output = "OrderedDict([\n"
for i, (name, param) in enumerate(model.state_dict().items()):
    # Convert tensor to a full string representation
    tensor_str = repr(param)
    end_char = "," if i < len(model.state_dict()) - 1 else ""
    output += f"    ('{name}', {tensor_str}){end_char}\n"
output += "])"

# Save the output to a text file
with open('model_state_dict.txt', 'w') as file:
    file.write(output)


In [None]:
torch.save(model.state_dict(), 'best_model1.pt')


In [None]:
torch.set_printoptions(precision=10)
print(model.state_dict())