In [31]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from tqdm.auto import tqdm
import multiprocessing as mp
import matplotlib.pyplot as plt

from FF.fractal_generation import branching_network as bn
from fracstack import measure_dimension, portfolio_plot


In [22]:
################################################################################
# 1. DEFINE YOUR FIXED & VARIABLE PARAMETERS
################################################################################

# Fixed "base" parameters that might not vary:
neuron_params_base = {
    'depth': 3,
    'mean_soma_radius': 1,
    'std_soma_radius': 1,
    'D': 1,  
    'branch_angle': np.pi / 4,
    'mean_branches': 1,
    'weave_type': 'Gauss',  
    'randomness': 0.1,
    'curviness': 'Gauss',
    'curviness_magnitude': 1,
    'n_primary_dendrites': 3,
    'initial_thickness': 5,
    'total_length': 500
}

network_params_base = {
    'width': 2049,
    'height': 2049,
    'num_neurons': 3,
    'edge_margin': 10,
}

param_names = [
    'depth',
    'D', 
    'mean_branches',
    'n_primary_dendrites',
    'branch_angle',
    'total_length',
    'num_neurons'
]

################################################################################
# 3. DATASET GENERATION: PARAM SAMPLES -> FRACTALS -> MEASURED D
################################################################################

def latin_hypercube_sampling(param_bounds, n_samples):
    """
    Latin Hypercube Sampling that guarantees inclusion of parameter bounds.
    
    param_bounds: List of (low, high) for each parameter dimension
    n_samples: Number of sample points to create (must be > 2)
    
    Returns: Array of shape (n_samples, d) where d is number of parameters
    """
    if n_samples < 3:
        raise ValueError("n_samples must be at least 3 to include endpoints and middle points")
        
    d = len(param_bounds)
    samples = np.empty((n_samples, d), dtype=float)
    
    for i, (low, high) in enumerate(param_bounds):
        # Create n_samples-2 points using LHS (excluding endpoints)
        bins = np.linspace(0, 1, n_samples - 1)  # n_samples - 1 points to create n_samples - 2 bins
        
        # Get the lower edge of each bin
        bin_starts = bins[:-1]
        # Get the bin width
        bin_width = 1.0 / (n_samples - 2)
        
        # Generate random offsets within each bin
        random_offsets = np.random.rand(n_samples - 2) * bin_width
        
        # Final positions = bin_start + random_offset
        positions = bin_starts + random_offsets
        
        # Randomly permute these positions
        positions = np.random.permutation(positions)
        
        # Add endpoints (0 and 1)
        positions = np.concatenate(([0], positions, [1]))
        
        # Map from [0,1] to [low,high]
        samples[:, i] = low + positions * (high - low)
    
    return samples

def generate_sample_fixed_params(param_vector):
    """
    Generate a single sample of neuron and network parameters and measure its fractal dimension,
    given a fixed parameter vector (one row from the LHS output).

    sample_id: integer index for logging/debugging purposes.
    param_vector: a 1D numpy array (or list) of length len(param_bounds),
                  containing values for each parameter in the same order as param_names.

    Returns:
        (param_array, d_measured) where
         - param_array is the final parameter values (floats, possibly rounded for int params)
         - d_measured is the measured fractal dimension (float).
    """
    # Separate sampling for neuron- and network-related parameters
    # We'll copy them from the base dictionaries to avoid mutating global state
    
    neuron_sampled_params = {}
    network_sampled_params = network_params_base.copy()

    # Loop over each dimension in param_vector
    for name, val in zip(param_names, param_vector):
        # Round int parameters
        if name in ['depth', 'n_primary_dendrites', 'num_neurons']:
            val = int(round(val))

        # Assign to the correct dictionary
        if name in neuron_params_base:
            neuron_sampled_params[name] = val
        elif name in network_params_base:
            network_sampled_params[name] = val

    # Merge neuron parameters with defaults
    neuron_params = neuron_params_base.copy()
    neuron_params.update(neuron_sampled_params)

    # Generate fractal & measure dimension
    try:
        n_tries = 0
        generated_fractal = False
        while not generated_fractal and n_tries < 10:
            testnet = bn.generate_network(
                network_id=f'nntest',
                neuron_params=neuron_params,
                network_params=network_sampled_params
            )

            net_masks = testnet.generate_binary_mask()
            fractal = (net_masks['outline'] > 0).astype(np.uint8)

            d_measured, _, _, _, R2 = measure_dimension(
                fractal,
                num_sizes=50,
                min_size=16,
                max_size=506,
                num_pos=10,
                pad_factor=1.5,
                multiprocessing=False
            )

            if (1 < d_measured < 2) and (R2 > 0.99):
                generated_fractal = True
                break
            else:
                n_tries += 1
                continue
        
        if not generated_fractal:   
            return None


    except Exception as e:
        print(f"Error in sample {e}")
        return None

    # Convert parameters to array in the same order as param_names
    # (note that param_vector might be floats; we want to return the final, possibly rounded, values)
    final_param_array = []
    for name, val in zip(param_names, param_vector):
        # If integer param, use the integer we stored above
        if name in ['depth', 'n_primary_dendrites', 'num_neurons']:
            final_val = int(round(val))
        else:
            final_val = float(val)
        final_param_array.append(final_val)

    final_param_array = np.array(final_param_array, dtype=np.float32)

    return (final_param_array, float(d_measured))

def generate_sample(sample_id, seed):
    """
    Generate a single sample of neuron and network parameters and measure its fractal dimension.
    This function runs independently in each worker process.
    """
    # Set the random seed for this worker
    np.random.seed(seed)

    # Separate sampling for neuron and network parameters
    neuron_sampled_params = {}
    network_sampled_params = network_params_base.copy()

    # Sample each parameter in param_bounds
    for (low, high), name in zip(param_bounds, param_names):
        val = np.random.uniform(low, high)

        # Cast to int for integer parameters
        if name in ['depth', 'n_primary_dendrites', 'num_neurons']:
            val = int(round(val))

        # Assign to appropriate dictionary
        if name in neuron_params_base:
            neuron_sampled_params[name] = val
        elif name in network_params_base:
            network_sampled_params[name] = val

    # Merge neuron parameters with defaults
    neuron_params = neuron_params_base.copy()
    neuron_params.update(neuron_sampled_params)

    # Generate fractal & measure dimension
    try:
        testnet = bn.generate_network(
            network_id=f'nntest_{sample_id}',
            neuron_params=neuron_params,
            network_params=network_sampled_params
        )
        net_masks = testnet.generate_binary_mask()
        fractal = (net_masks['outline'] > 0).astype(np.uint8)
        d_measured, _, _, _, _ = measure_dimension(
            fractal,
            num_sizes=50,
            min_size=16,
            max_size=506,
            num_pos=10,
            pad_factor=2,
            multiprocessing=False
        )
    except Exception as e:
        print(f"Error in sample {sample_id}: {e}")
        return None

    # Convert parameters to array in the same order as param_names
    param_array = np.array([neuron_params[name] if name in neuron_params 
                           else network_sampled_params[name] 
                           for name in param_names])
    
    return (param_array, float(d_measured))

def create_dataset_parallel(n_samples=2000, all_param_sets=None, num_workers=None):
    """
    Generate a dataset of parameter sets -> measured dimension in parallel.
    """
    if num_workers is None:
        num_workers = mp.cpu_count()
        print(f"Using {num_workers} workers")  # Use all available CPU cores by default

    # Create a multiprocessing pool
    with mp.Pool(processes=num_workers) as pool:
        tasks = [all_param_sets[i] for i in range(n_samples)]
        results = pool.map(generate_sample_fixed_params, tasks)

    # Filter out any failed results (None)
    results = [result for result in results if result is not None]

    # Split the results into parameters and dimensions
    param_samples, d_values = zip(*results)
    print(f'% of samples failed: {100*(n_samples-len(results))/n_samples:.2f}%')

    param_samples = np.array(param_samples, dtype=np.float32)
    d_values = np.array(d_values, dtype=np.float32)

    # make param_samples a dataframe with d_measured as first column
    samples_df = pd.DataFrame(param_samples, columns=param_names)
    samples_df.insert(0, 'd_measured', d_values)

    return param_samples, d_values, samples_df


################################################################################
# 4. SURROGATE MODEL: PARAMS -> D
################################################################################

class ParamToDimensionModel(nn.Module):
    def __init__(self, input_dim, hidden_dim=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)  
        )
    def forward(self, x):
        return self.net(x)

def train_surrogate_model(X, y, epochs=50, lr=1e-3, batch_size=64, loss_threshold=0.01):
    """
    Train a PyTorch regression model from parameter vectors X to fractal dimension y.
    X: shape (n_samples, d_params)
    y: shape (n_samples, )
    """
    # Build dataset
    dataset = TensorDataset(X, y)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Initialize model, loss, optimizer
    model = ParamToDimensionModel(input_dim=X.shape[1], hidden_dim=128)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in tqdm(range(epochs), desc="Training"):
        total_loss = 0.0

        for batch_x, batch_y in loader:
            optimizer.zero_grad()
            pred = model(batch_x)
            loss = criterion(pred, batch_y.unsqueeze(1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * len(batch_x)
        
        mse = total_loss / len(dataset)
        if (epoch+1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, MSE={mse:.4f}")
            
        if mse < loss_threshold:
            print(f"Reached loss threshold {loss_threshold} at epoch {epoch+1}")
            break
        
    return model



In [None]:
param_bounds = [
    (3, 11),          # depth (integer)
    (1, 2),           # D (float)
    (0.75, 1.5),       # mean_branches (float)
    (3, 8),           # n_primary_dendrites (integer)
    (np.pi/8, np.pi/2),     # branch_angle (float)
    (50, 500),      # total_length (float)
    (3, 50)           # num_neurons (integer)
]

n_samples = 1000
all_param_sets = latin_hypercube_sampling(param_bounds, n_samples=n_samples)
param_names = ['depth', 'D', 'mean_branches', 'n_primary_dendrites', 'branch_angle', 'total_length', 'num_neurons']

param_array, d_array, samples_df = create_dataset_parallel(
    n_samples=n_samples,
    all_param_sets=all_param_sets
)

fig, axes = plt.subplots(4, 2, figsize=(15, 20))
axes = axes.flatten()

params = ['D', 'depth', 'mean_branches', 'n_primary_dendrites', 
          'branch_angle', 'total_length', 'num_neurons']

for i, param in enumerate(params):
    axes[i].scatter(samples_df[param], samples_df['d_measured'], alpha=0.1)
    axes[i].set_xlabel(f'{param}')
    axes[i].set_ylabel('Measured D')
    axes[i].set_title(f'{param} vs Measured D')
    axes[i].grid(True)

plt.tight_layout()
plt.show()

print(np.max(samples_df['d_measured']))
print(np.min(samples_df['d_measured']))

samples_df

In [15]:
df_name = 'samples_df_1000.csv'
samples_df.to_csv(rf'/home/apd/Projects/FractalFluency/notebooks/{df_name}', index=False)


In [40]:
# 6.2 Convert to PyTorch Tensors
X = torch.from_numpy(param_array)        # shape: (n_samples, d_params)
y = torch.from_numpy(d_array)            # shape: (n_samples,)

# 6.3 Train the surrogate model
print("Training surrogate model...")
model = train_surrogate_model(X, y, epochs=300, lr=1e-5, batch_size=100)

Training surrogate model...


Training:   0%|          | 0/300 [00:00<?, ?it/s]

Epoch 10/300, MSE=0.4200
Epoch 20/300, MSE=0.2986
Epoch 30/300, MSE=0.2543
Epoch 40/300, MSE=0.2309
Epoch 50/300, MSE=0.2187
Epoch 60/300, MSE=0.1995
Epoch 70/300, MSE=0.1883
Epoch 80/300, MSE=0.1702
Epoch 90/300, MSE=0.1513
Epoch 100/300, MSE=0.1424
Epoch 110/300, MSE=0.1343
Epoch 120/300, MSE=0.1270
Epoch 130/300, MSE=0.1198
Epoch 140/300, MSE=0.1129
Epoch 150/300, MSE=0.1064
Epoch 160/300, MSE=0.1004
Epoch 170/300, MSE=0.0936
Epoch 180/300, MSE=0.0873
Epoch 190/300, MSE=0.0821
Epoch 200/300, MSE=0.0774
Epoch 210/300, MSE=0.0729
Epoch 220/300, MSE=0.0684
Epoch 230/300, MSE=0.0643
Epoch 240/300, MSE=0.0596
Epoch 250/300, MSE=0.0550
Epoch 260/300, MSE=0.0498
Epoch 270/300, MSE=0.0453
Epoch 280/300, MSE=0.0416
Epoch 290/300, MSE=0.0391
Epoch 300/300, MSE=0.0366


In [41]:
################################################################################
# 5. INVERTING THE SURROGATE: TARGET D -> PARAMS (GRADIENT-BASED SEARCH)
################################################################################

def find_params_for_D(target_D, model, init_params=None, param_bounds=None,
                      steps=500, lr=1e-2, verbose=True, loss_threshold=0.01, plot_loss=False):
    if init_params is None:
        # Randomly initialize within bounds
        init_values = []
        for (low, high) in param_bounds:
            val = np.random.uniform(low, high)
            init_values.append(val)
        init_params = np.array(init_values, dtype=np.float32)


    n_neurons = int(np.random.uniform(0.8, 1.2) * (np.round(3 + (target_D - 1.0) * (50 - 3))))
    D = np.random.uniform(0.8, 1.2)*target_D

    # Make a learnable tensor
    params_tensor = torch.tensor(init_params, dtype=torch.float32, requires_grad=True)
    optimizer = torch.optim.SGD([params_tensor], lr=lr)

    target_tensor = torch.tensor([target_D], dtype=torch.float32)

    D_param_index = 1  # Adjust these indices to match your parameter order
    n_neurons_index = 6  # Adjust this to the correct index for n_neurons

    losses = []  # Track losses if plotting requested

    for step in tqdm(range(steps), desc="Searching", leave=False):
        optimizer.zero_grad()
        

        with torch.no_grad():
            params_tensor[D_param_index] = D
            params_tensor[n_neurons_index] = n_neurons
            
        
        # Surrogate prediction
        predicted_D = model(params_tensor.unsqueeze(0)).squeeze(0)
        loss = (predicted_D - target_tensor).abs()
        
        if plot_loss:
            losses.append(loss.item())
            
        loss.backward()
        optimizer.step()

        # Clamp all parameters to bounds and ensure D stays at target
        with torch.no_grad():
            for i, (low, high) in enumerate(param_bounds):
                if i == D_param_index:
                    params_tensor[i] = target_D
                elif i == n_neurons_index:
                    params_tensor[i] = n_neurons    
                else:
                    params_tensor[i].clamp_(low, high)
        
        if verbose and step % 100 == 0:
            print(f"Step {step}, predicted_D={predicted_D.item():.3f}, loss={loss.item():.3f}")
            
        if loss.item() < loss_threshold:
            if verbose:
                print(f"Reached loss threshold {loss_threshold} at step {step}")
            break

      
    print(f'Loss after {step} steps: {loss.item()}')

    if plot_loss:
        plt.figure(figsize=(10,5))
        plt.plot(losses)
        plt.xlabel('Step')
        plt.ylabel('Loss')
        plt.title('Loss vs. Optimization Step')
        plt.grid(True)
        plt.show()
        
    return params_tensor.detach().numpy()


def make_portfolio(target_D_list, n_attempts=10, save_dir=None):
    
    found_params_list = []

    for i, target_D in enumerate(target_D_list):

        generated_fractal = False
        attempts = 0


        np.random.seed(i)

        current_params = [
                        np.random.randint(3, 11),          # depth (integer)
                        np.random.uniform(1, 2),           # D (float)
                        np.random.uniform(0.75, 1.5),       # mean_branches (float)
                        np.random.randint(3, 8),           # n_primary_dendrites (integer)
                        np.random.uniform(np.pi/8, np.pi/2),     # branch_angle (float)
                        np.random.randint(50, 500),      # total_length (float)
                        np.random.randint(3, 30)           # num_neurons (integer)
                        ]   
        
        while generated_fractal is False and attempts < n_attempts:

            found_params = find_params_for_D(
                target_D=target_D,
                model=model,
                init_params=current_params, 
                param_bounds=param_bounds,
                steps=50000,
                lr=1e-4,
                verbose=False,
                plot_loss=False,
                loss_threshold=0.01
            )

            neuron_params = neuron_params_base.copy()
            network_params = network_params_base.copy()

            for name, val in zip(param_names, found_params):
                if name in neuron_params_base:
                    neuron_params[name] = float(val)
                else:
                    network_params[name] = float(val)
        
            # Ensure integer values for certain parameters
            for param in ['depth', 'n_primary_dendrites', 'num_neurons']:
                if param in neuron_params:
                    neuron_params[param] = int(round(neuron_params[param]))
                if param in network_params:
                    network_params[param] = int(round(network_params[param]))

            branch_network = bn.generate_network(network_id=f'nntest_{target_D}', neuron_params=neuron_params, network_params=network_params)
            net_masks = branch_network.generate_binary_mask()
            fractal = (net_masks['outline'] > 0).astype(np.uint8)
            d_value, _, _, _, R2 = measure_dimension(fractal, num_sizes=50, min_size=16, max_size=506, num_pos=100, pad_factor=2, multiprocessing=True)

            if np.abs(d_value - target_D) < 0.025:
                if R2 > 0.99:
                    generated_fractal = True
                    found_params_list.append(found_params)
                    break
                else:
                    attempts += 1
                    print(f'R2={R2} is too low for target_D={target_D}')
            else:
                attempts += 1
                print(f'd_value={d_value} is too far from target_D={target_D}')
        
        if not generated_fractal:
            print(f'Failed to generate fractal for target_D={target_D}')
            continue

        portfolio_plot(fractal, target_D0=target_D, save_dir=save_dir, f_name=f'branch_network_example_{target_D}.png')

    
    params_df = pd.DataFrame(found_params_list, columns=param_names)
    params_df.insert(0, 'target_D', target_D_list)    

    return params_df

In [42]:
# 6.4 Invert for a single target D
target_D = 1.9
print(f"\nFinding params for target D={target_D}")

found_params = find_params_for_D(
    target_D=target_D,
    model=model,
    init_params=None,    # or pass a custom init
    param_bounds=param_bounds,
    steps=20000,
    lr=1e-3,
    verbose=False,
    plot_loss=False,
    loss_threshold=0.05
)

found_params_df = pd.DataFrame([found_params], columns=param_names)
found_params_df


Finding params for target D=1.9


Searching:   0%|          | 0/20000 [00:00<?, ?it/s]

Loss after 19999 steps: 0.33663511276245117


Unnamed: 0,depth,D,mean_branches,n_primary_dendrites,branch_angle,total_length,num_neurons
0,10.373749,1.9,0.75,5.417548,1.570796,81.318222,43.0


In [None]:
save_dir = r'/home/apd/Projects/FractalFluency/datasets/fractal_portfolio/branch_networks'

target_D_list = np.linspace(1.1, 1.9, 9)

make_portfolio(target_D_list=target_D_list, n_attempts=10, save_dir=save_dir)

In [None]:
# 6.5 Generate fractals across a range from D=1.1 to D=1.9 (smooth walk)
target_Ds = np.linspace(1.1, 1.9, 9)
print("\nGenerating smooth range of fractals from D=1.1 to D=1.9...")
current_params = None
generated_params_list = []

for d_target in target_Ds:
    print(f"Target D={d_target:.2f}")
    current_params = find_params_for_D(
        target_D=d_target,
        model=model,
        init_params=current_params,
        param_bounds=param_bounds,
        steps=50000,
        lr=1e-3,
        verbose=False,
        plot_loss=False,
        loss_threshold=0.05
    )
    generated_params_list.append(current_params)

# 6.6 Create fractals using these found params and measure real D
for i, d_target in enumerate(target_Ds):
    solution_params = generated_params_list[i]
    
    # Build separate dicts for neuron and network params
    neuron_params = neuron_params_base.copy()
    network_params = network_params_base.copy()
    
    # Assign each parameter to the correct dictionary
    for name, val in zip(param_names, solution_params):
        if name in neuron_params_base:
            neuron_params[name] = float(val)
        else:
            network_params[name] = float(val)
    
    # Ensure integer values for certain parameters
    for param in ['depth', 'n_primary_dendrites']:
        if param in neuron_params:
            neuron_params[param] = int(round(neuron_params[param]))
    network_params['num_neurons'] = int(round(network_params['num_neurons']))
    
    neuronn = bn.generate_network(
        network_id=f'nntest_{i}', 
        neuron_params=neuron_params, 
        network_params=network_params
    )
    net_masks = neuronn.generate_binary_mask()
    fractal = (net_masks['outline'] > 0).astype(np.uint8)
    measured_d, _, _, _, _ = measure_dimension(fractal, num_sizes=50, min_size=16, max_size=506, num_pos=100, pad_factor=2, multiprocessing=True)
    print(f"---\nFor target D={d_target:.2f}, measured D={measured_d:.3f}")
    # Save or display fractal if desired, e.g., plt.imshow(fractal_img)...

