In [None]:
# Core Imports and Functions
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from scipy.ndimage import label
from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import itertools

# Configuration Parameters
DIM = 3        # Base dimension for patches (3×3)
POWER = 4      # Lattice size = DIM^POWER
MAX_STEPS = POWER + 5  # Maximum coarse-graining steps

def generate_percolation_lattice(size, p):
    return np.random.choice([0, 1], (size, size), p=[1 - p, p])

def check_percolation(lattice):
    labeled, _ = label(lattice)
    
    # Vertical percolation (top-bottom)
    top = set(labeled[0, :]) - {0}
    bottom = set(labeled[-1, :]) - {0}
    vertical = bool(top & bottom)
    
    # Horizontal percolation (left-right)
    left = set(labeled[:, 0]) - {0}
    right = set(labeled[:, -1]) - {0}
    horizontal = bool(left & right)
    
    return float(vertical or horizontal)

class PercolationModel(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
        self.rule = nn.Sequential(
            nn.Linear(dim * dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x, max_steps=MAX_STEPS):
        b, c, H, W = x.shape
        
        for _ in range(max_steps):
            if H < self.dim or W < self.dim:
                break
            
            # Extract dim×dim patches
            patches = F.unfold(x, kernel_size=self.dim, stride=self.dim)
            patches = patches.permute(0, 2, 1).contiguous()  # (B, #patches, dim*dim)
            patches = patches.view(-1, self.dim * self.dim)   # (B * #patches, dim*dim)
            
            out = self.rule(patches)  # (B * #patches, 1)
            
            new_h, new_w = H // self.dim, W // self.dim
            x = out.view(b, 1, new_h, new_w)  # (B, 1, new_h, new_w)
            _, c, H, W = x.shape

        return x.squeeze()  # (B,)

def prepare_dataset(num_samples, lattice_size):
    data = []
    for _ in tqdm(range(num_samples), desc="Generating data"):
        p = np.random.uniform(0, 1)
        lattice = generate_percolation_lattice(lattice_size, p)
        lbl = check_percolation(lattice)
        data.append((lattice, lbl))
    return data

def train_epoch(model, device, train_data, batch_size, optimizer, criterion, dim):
    model.train()
    running_loss = 0.0
    for i in tqdm(range(0, len(train_data), batch_size), desc="Training"):
        batch = train_data[i : i + batch_size]
        
        # Extract lattices and labels
        lattices = [torch.tensor(x, dtype=torch.float32) for x, _ in batch]
        labels = [y for _, y in batch]
        
        # Stack into a (B, 1, H, W) tensor
        inputs = torch.stack(lattices).unsqueeze(1).to(device)
        targets = torch.tensor(labels, dtype=torch.float32).to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs, max_steps=MAX_STEPS)  # (B,) after .squeeze()
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * len(batch)
    
    return running_loss / len(train_data)

def test_systems(model, dim, power, device="cpu", num_tests=10,
                 system_size="standard", p_range=(0, 1), verbose=True):
    """
    Tests model on raw lattices without initial coarse-graining.
    Args:
        system_size: 'smaller' (dim^(power-1)), 'standard' (dim^power), or 'larger' (dim^(power+1))
    """
    model.eval()
    
    size_power = {
        "smaller": power - 1,
        "standard": power,
        "larger": power + 1
    }[system_size]
    
    lattice_size = dim ** size_power
    results = []
    
    for _ in tqdm(range(num_tests), desc=f"Testing {lattice_size}x{lattice_size}"):
        p = np.random.uniform(*p_range)
        lattice = generate_percolation_lattice(lattice_size, p)
        true_label = check_percolation(lattice)
        
        input_tensor = torch.tensor(lattice, dtype=torch.float32)
        input_tensor = input_tensor.unsqueeze(0).unsqueeze(0).to(device)  # [1,1,H,W]
        
        with torch.no_grad():
            pred = model(input_tensor).item()
        
        results.append((lattice, true_label, pred))
    
    threshold = 0.5
    correct = sum(1 for _, lbl, pred in results if (pred > threshold) == lbl)
    acc = correct / num_tests
    
    if verbose:
        print(f"\n{lattice_size}x{lattice_size} Results:")
        print(f"Accuracy: {acc:.2%}")
        print(f"Avg prediction | Perc: {np.mean([p for _, l, p in results if l == 1]):.3f}")
        print(f"Avg prediction | Non-Perc: {np.mean([p for _, l, p in results if l == 0]):.3f}")
    
    return results

In [None]:
# Initialization
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SIZE = DIM ** POWER
TRAIN_SAMPLES = 10000
BATCH_SIZE = 10
EPOCHS = 5

def run_experiment(DIM, POWER, run_num):
    SIZE = DIM ** POWER
    MAX_STEPS = POWER + 5
    
    # Generate training dataset
    train_data = prepare_dataset(TRAIN_SAMPLES, SIZE)
    
    # Initialize model, optimizer, loss
    model = PercolationModel(dim=DIM).to(DEVICE)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.BCELoss()

    # Training loop
    for epoch in range(EPOCHS):
        loss = train_epoch(model, DEVICE, train_data, BATCH_SIZE, optimizer, criterion, DIM)

    # Testing configurations
    test_configs = [
        {"system_size": "smaller", "num_tests": 100, "p_range": (0.1, 0.9)},
        {"system_size": "standard", "num_tests": 100, "p_range": (0.1, 0.9)},
        {"system_size": "larger", "num_tests": 100, "p_range": (0.1, 0.9)},
        {"system_size": "standard", "num_tests": 200, "p_range": (0.58, 0.61)},
    ]
    
    test_results = {}
    for config in test_configs:
        key = f"{config['system_size']}_{config['p_range'][0]}-{config['p_range'][1]}"
        test_results[key] = test_systems(
            model=model,
            dim=DIM,
            power=POWER,
            device=DEVICE,
            **config
        )

    # Summarize test results
    summaries = {}
    for key, results in test_results.items():
        acc = sum((pred > 0.5) == lbl for _, lbl, pred in results) / len(results)
        mean_perc = np.nanmean([pred for _, lbl, pred in results if lbl == 1])
        mean_non_perc = np.nanmean([pred for _, lbl, pred in results if lbl == 0])
        summaries[key] = (acc, mean_perc, mean_non_perc)

    # Generate rule-projection data by feeding 3×3 patch of constant p
    ps_coarse = np.linspace(0.0, 1.0, 101)
    ps_fine = np.linspace(0.5, 0.7, 201)
    ps = np.unique(np.concatenate((ps_coarse, ps_fine)))
    ps.sort()

    mean_outputs = []
    with torch.no_grad():
        for p in ps:
            # Create a single 3×3 patch with all entries = p
            patch = np.full((DIM * DIM,), p, dtype=np.float32)        # shape: (dim*dim,)
            patch_tensor = torch.from_numpy(patch).unsqueeze(0).to(DEVICE)  # (1, dim*dim)
            out = model.rule(patch_tensor).cpu().numpy().item()        # scalar
            mean_outputs.append(out)

    return summaries, (ps, mean_outputs)

In [None]:
# Main experiment loop
all_results = {}
all_plots = {}
combinations = [(3, 2), (3, 3), (3, 4), (4, 2), (4, 3), (4, 4)]
n_runs = 10

for DIM, POWER in combinations:
    key = f"{DIM}^{POWER}"
    all_results[key] = []
    all_plots[key] = []
    
    for run in range(n_runs):
        print(f"\nRunning {key} - Run {run+1}/{n_runs}")
        summaries, plot_data = run_experiment(DIM, POWER, run)
        all_results[key].append(summaries)
        all_plots[key].append(plot_data)
    

In [None]:
# Generate consolidated PDF report
with PdfPages("consolidated_results.pdf") as pdf:
    for DIM, POWER in combinations:
        key = f"{DIM}^{POWER}"
        
        plt.figure(figsize=(10, 6))
        fixed_points = []
        
        for run_idx, (ps, outputs) in enumerate(all_plots[key]):
            plt.plot(ps, outputs, alpha=0.5, label=f"Run {run_idx+1}")
            
            # Find intersection with f(p) = p
            diff = np.array(outputs) - np.array(ps)
            sign_changes = np.where(np.diff(np.sign(diff)))[0]
            
            # For each sign change, find the approximate fixed point
            run_fixed_points = []
            for i in sign_changes:
                if i+1 < len(ps):
                    x0, x1 = ps[i], ps[i+1]
                    y0, y1 = outputs[i], outputs[i+1]
                    # Linear approximation of intersection
                    t = (x0 - y0) / ((y1 - y0) - (x1 - x0))
                    fixed_point = x0 + t * (x1 - x0)
                    run_fixed_points.append(fixed_point)
            
            if run_fixed_points:
                # Take the fixed point closest to the known critical point (~0.5927)
                best_fp = min(run_fixed_points, key=lambda x: abs(x - 0.5927))
                fixed_points.append(best_fp)
                plt.scatter([best_fp], [best_fp], color='black', s=20)
        
        # Plot f(p) = p line
        plt.plot([0, 1], [0, 1], color="black", linestyle="--", label="f(p) = p")
        
        # Add mean fixed point if available
        if fixed_points:
            mean_fp = np.mean(fixed_points)
            plt.plot(mean_fp, mean_fp, color='red', label=f"Mean Fixed Point: {mean_fp:.4f}")
        
        plt.title(f"NFC Rule Projection - Configuration {key}", fontsize=18)
        plt.xlabel("Density (p)", fontsize=14)
        plt.ylabel(r"$f_{\theta}(p\mathbf{1})$", fontsize=14)
        plt.legend(fontsize=12)
        pdf.savefig()
        plt.close()

# Generate consolidated text report
with open("consolidated_accuracies.txt", "w", encoding="utf8") as f:
    for DIM, POWER in combinations:
        key = f"{DIM}^{POWER}"
        
        f.write(f"\n{'='*40}\nConfiguration: {key}\n{'='*40}\n")
        
        # Get fixed points for this configuration
        config_fixed_points = []
        for run_data in all_plots[key]:
            ps, outputs = run_data
            diff = np.array(outputs) - np.array(ps)
            sign_changes = np.where(np.diff(np.sign(diff)))[0]
            
            run_fixed_points = []
            for i in sign_changes:
                if i+1 < len(ps):
                    x0, x1 = ps[i], ps[i+1]
                    y0, y1 = outputs[i], outputs[i+1]
                    t = (x0 - y0) / ((y1 - y0) - (x1 - x0))
                    fixed_point = x0 + t * (x1 - x0)
                    run_fixed_points.append(fixed_point)
            
            if run_fixed_points:
                best_fp = min(run_fixed_points, key=lambda x: abs(x - 0.5927))
                config_fixed_points.append(best_fp)
        
        if config_fixed_points:
            mean_fp = np.mean(config_fixed_points)
            std_fp = np.std(config_fixed_points)
            f.write("Fixed Points (intersection with f(p) = p):\n")
            f.write(f"  Values: {[f'{fp:.6f}' for fp in config_fixed_points]}\n")
            f.write(f"  Mean: {mean_fp:.6f} ± {std_fp:.6f}\n\n")
        else:
            f.write("No clear fixed points found (no intersection with f(p) = p)\n\n")
        
        # Write accuracy information with full details for each test type
        test_types = ["smaller_0.1-0.9", "standard_0.1-0.9", "larger_0.1-0.9", "standard_0.58-0.61"]
        for test_type in test_types:
            f.write(f"{test_type.replace('_', ' ')}:\n")
            
            # Get all runs' results for this test type
            all_runs_results = [run[test_type] for run in all_results[key]]
            
            # Write detailed accuracy for each run
            for run_idx, (acc, mean_perc, mean_non_perc) in enumerate(all_runs_results):
                f.write(f"  Run {run_idx+1}:\n")
                f.write(f"    Accuracy: {acc:.4f}\n")
                f.write(f"    Avg prediction | Perc: {mean_perc:.4f}\n")
                f.write(f"    Avg prediction | Non-Perc: {mean_non_perc:.4f}\n")
            
            # Calculate and write summary statistics
            accs = [x[0] for x in all_runs_results]
            mean_percs = [x[1] for x in all_runs_results]
            mean_non_percs = [x[2] for x in all_runs_results]
            
            f.write("\n  Summary statistics:\n")
            f.write(f"    Accuracy: {np.mean(accs):.4f} ± {np.std(accs):.4f}\n")
            f.write(f"    Avg prediction | Perc: {np.mean(mean_percs):.4f} ± {np.std(mean_percs):.4f}\n")
            f.write(f"    Avg prediction | Non-Perc: {np.mean(mean_non_percs):.4f} ± {np.std(mean_non_percs):.4f}\n\n")