# fz-PSO: Standalone Example (without fzd)

This notebook demonstrates how to use the **Particle Swarm Optimization (PSO)** algorithm plugin **without** coupling it to a model via `fzd`.

This is useful for:
- Testing the algorithm implementation
- Understanding how the swarm explores the search space
- Visualizing the optimization path

**Requirements:** R and `rpy2` must be installed.

## Setup: Install Dependencies

In [None]:
# Install fz framework and rpy2
%pip install -q git+https://github.com/Funz/fz.git
%pip install -q rpy2

## Example 1: Load the R Algorithm via rpy2

Load the PSO algorithm by sourcing the `.R` file and creating an instance of the S3 class.

In [None]:
from rpy2 import robjects

# Source the R algorithm file
robjects.r.source(".fz/algorithms/PSO.R")
r_globals = robjects.globalenv

# Create an instance with custom options
r_algo = robjects.r["PSO"](
    yminimization=True,
    max_iterations=20,
    nparticles=15,
    seed=123
)
print(f"Algorithm class: {robjects.r['class'](r_algo)[0]}")
print(f"Options: yminimization=True, max_iterations=20, nparticles=15, seed=123")

## Example 2: Define a Test Function

We use the **Branin** function, a classic 2D optimization benchmark with three global minima.

In [None]:
import math

def branin(point):
    """Branin function scaled to [0,1]^2. Has 3 global minima \u2248 0.3979."""
    x1 = point["x1"] * 15 - 5
    x2 = point["x2"] * 15
    return (x2 - 5 / (4 * math.pi**2) * x1**2 + 5 / math.pi * x1 - 6)**2 + \
           10 * (1 - 1 / (8 * math.pi)) * math.cos(x1) + 10

# Define input variable ranges
input_vars = {"x1": (0.0, 1.0), "x2": (0.0, 1.0)}
output_vars = ["branin"]

print(f"Test function: Branin (scaled to [0,1]^2)")
print(f"Input variables: {input_vars}")
print(f"Known global minimum: \u2248 0.3979")

## Example 3: Run the Algorithm Lifecycle

Execute the full PSO lifecycle:
1. `get_initial_design()` \u2014 get initial swarm of particles
2. Evaluate the test function at those points (in Python)
3. `get_next_design()` \u2014 update velocities and positions
4. Repeat until max iterations or convergence
5. `get_analysis()` \u2014 get final results from R

In [None]:
# Step 1: Get initial design from R
r_input_vars = robjects.r('list(x1 = c(0.0, 1.0), x2 = c(0.0, 1.0))')
r_output_vars = robjects.StrVector(["branin"])

r_design = r_globals['get_initial_design'](r_algo, r_input_vars, r_output_vars)

# Convert R design to Python
def r_points_to_python(r_points):
    points = []
    for i in range(len(r_points)):
        point = {}
        r_point = r_points[i]
        for name in r_point.names:
            point[name] = r_point.rx2(name)[0]
        points.append(point)
    return points

design = r_points_to_python(r_design)
print(f"Initial swarm: {len(design)} particles")
for i, point in enumerate(design):
    print(f"  Particle {i}: x1={point['x1']:.4f}, x2={point['x2']:.4f}")

In [None]:
# Step 2: Evaluate and run iterative loop
all_points = list(design)
all_outputs = [branin(p) for p in design]
all_X = r_design
all_Y = robjects.FloatVector(all_outputs)

print(f"Initial evaluations: {len(all_outputs)} particles")
print(f"  Min output: {min(all_outputs):.6f}")

# Step 3: Iterative loop
iteration = 0
while True:
    # Progress report
    try:
        r_tmp = r_globals['get_analysis_tmp'](r_algo, all_X, all_Y)
        if r_tmp is not None and r_tmp.names is not None and 'text' in list(r_tmp.names):
            print(r_tmp.rx2('text')[0])
    except Exception:
        pass

    # Get next design
    r_next = r_globals['get_next_design'](r_algo, all_X, all_Y)

    if len(r_next) == 0:
        print(f"\nAlgorithm finished after {iteration} iterations")
        break

    # Convert, evaluate, accumulate
    next_points = r_points_to_python(r_next)
    new_outputs = [branin(p) for p in next_points]
    all_points.extend(next_points)
    all_outputs.extend(new_outputs)

    # Update R lists
    for i in range(len(r_next)):
        all_X = robjects.r('c')(all_X, robjects.r('list')(r_next[i]))
    all_Y = robjects.FloatVector(all_outputs)

    iteration += 1

print(f"Total evaluations: {len(all_outputs)}")
print(f"Best value found: {min(all_outputs):.6f}")

In [None]:
# Step 4: Get final analysis from R
r_analysis = r_globals['get_analysis'](r_algo, all_X, all_Y)

print("=" * 60)
print("ANALYSIS RESULTS")
print("=" * 60)

if r_analysis.names is not None and 'text' in list(r_analysis.names):
    print(r_analysis.rx2('text')[0])

if r_analysis.names is not None and 'data' in list(r_analysis.names):
    r_data = r_analysis.rx2('data')
    print("Data:")
    for name in r_data.names:
        val = r_data.rx2(name)
        try:
            print(f"  {name}: {val[0]}")
        except Exception:
            print(f"  {name}: {val}")

## Example 4: Visualize Swarm Exploration

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Extract values
x1_vals = [p['x1'] for p in all_points]
x2_vals = [p['x2'] for p in all_points]

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Contour with swarm positions
ax = axes[0]
xi = np.linspace(0.0, 1.0, 100)
yi = np.linspace(0.0, 1.0, 100)
Xi, Yi = np.meshgrid(xi, yi)
Zi = np.zeros_like(Xi)
for i in range(100):
    for j in range(100):
        Zi[i, j] = branin({"x1": Xi[i, j], "x2": Yi[i, j]})

ax.contourf(Xi, Yi, np.log10(Zi), levels=20, cmap='viridis', alpha=0.7)

# Color particles by iteration
nparticles = len(design)  # initial swarm size
n_iters = len(all_points) // nparticles
colors = plt.cm.hot(np.linspace(0, 0.8, n_iters))
for it in range(n_iters):
    start = it * nparticles
    end = start + nparticles
    ax.scatter([all_points[k]['x1'] for k in range(start, min(end, len(all_points)))],
              [all_points[k]['x2'] for k in range(start, min(end, len(all_points)))],
              c=[colors[it]], s=20, alpha=0.6, edgecolors='white', linewidth=0.3)

# Mark best
best_idx = all_outputs.index(min(all_outputs))
ax.plot(x1_vals[best_idx], x2_vals[best_idx], 'r*', markersize=15,
        label=f'Best ({min(all_outputs):.4f})', zorder=10)

ax.set_xlabel('x1')
ax.set_ylabel('x2')
ax.set_title('Branin Function - PSO Swarm Exploration')
ax.legend(fontsize=8)

# Plot 2: Convergence history
ax = axes[1]
best_so_far = []
current_best = float('inf')
for y in all_outputs:
    current_best = min(current_best, y)
    best_so_far.append(current_best)

ax.plot(range(1, len(best_so_far) + 1), best_so_far, 'b-o', markersize=2)
ax.axhline(0.3979, color='red', linestyle='--', alpha=0.7, label='Global min \u2248 0.3979')
ax.set_xlabel('Evaluation number')
ax.set_ylabel('Best value found')
ax.set_title('Convergence History')
ax.legend()
ax.set_yscale('log')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Alternative: Load via fz plugin system

You can also load the algorithm using fz's `load_algorithm()`, which handles the rpy2 wrapping automatically:

In [None]:
from fz.algorithms import load_algorithm

# Load R algorithm via fz (auto-wraps with rpy2)
algo = load_algorithm(".fz/algorithms/PSO.R",
                      yminimization=True, max_iterations=10,
                      nparticles=15, seed=42)

# The wrapper provides the same Python interface
design = algo.get_initial_design({"x1": (0.0, 1.0), "x2": (0.0, 1.0)}, ["result"])
print(f"Got {len(design)} initial particles from R algorithm via fz wrapper")
for i, p in enumerate(design):
    print(f"  Particle {i}: {p}")