In [2]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import matplotlib.pyplot as plt
from fenics import *
from stable_baselines3 import PPO

#########################################
# ALL CODE IN ONE CELL
#########################################

# 1) Create triangle mask function
def create_triangle_mask(Nx, Ny, L, H):
    x_vals = np.linspace(0, L, Nx)
    y_vals = np.linspace(0, H, Ny)
    X, Y = np.meshgrid(x_vals, y_vals)

    # Example triangle vertices
    v1 = np.array([L/2, 0.8*H])   # top
    v2 = np.array([0.2*L, 0.2*H]) # bottom-left
    v3 = np.array([0.8*L, 0.2*H]) # bottom-right

    def is_inside_triangle(pt, v1, v2, v3):
        d1 = (pt[0] - v2[0])*(v1[1] - v2[1]) - (v1[0] - v2[0])*(pt[1] - v2[1])
        d2 = (pt[0] - v3[0])*(v2[1] - v3[1]) - (v2[0] - v3[0])*(pt[1] - v3[1])
        d3 = (pt[0] - v1[0])*(v3[1] - v1[1]) - (v3[0] - v1[0])*(pt[1] - v1[1])
        has_neg = (d1 < 0) or (d2 < 0) or (d3 < 0)
        has_pos = (d1 > 0) or (d2 > 0) or (d3 > 0)
        return not (has_neg and has_pos)

    mask = np.zeros((Ny, Nx), dtype=np.float64)
    for j in range(Ny):
        for i in range(Nx):
            pt = np.array([X[j, i], Y[j, i]])
            mask[j, i] = 1.0 if is_inside_triangle(pt, v1, v2, v3) else 0.0

    return mask

# 2) Custom FEniCS expression for wave summation
class StaticTransducerWave(UserExpression):
    def __init__(self, A, px, py, pz, **kwargs):
        super().__init__(**kwargs)
        self.A  = A
        self.px = px
        self.py = py
        self.pz = pz

    def eval(self, value, x):
        val = 0.0
        for Ai, px_i, py_i, pz_i in zip(self.A, self.px, self.py, self.pz):
            dx = x[0] - px_i
            dy = x[1] - py_i
            dz = x[2] - pz_i
            r2 = dx*dx + dy*dy + dz*dz
            val += Ai * np.exp(-100.0 * r2)
        value[0] = val

# 3) Define the AcousticEnv Gym environment
class AcousticEnv(gym.Env):
    def __init__(self):
        super().__init__()
        
        # Domain geometry
        self.L, self.H, self.W = 0.1, 0.1, 0.1
        self.nx, self.ny, self.nz = 30, 30, 30
        self.mesh = BoxMesh(Point(0, 0, 0),
                            Point(self.L, self.H, self.W),
                            self.nx, self.ny, self.nz)
        self.V = FunctionSpace(self.mesh, \"P\", 1)
        self.bc = DirichletBC(self.V, Constant(0.0), \"on_boundary\")

        # Transducer positions
        self.transducer_positions = [
            (0.02, 0.02, 0.09), (0.02, 0.08, 0.09), (0.05, 0.05, 0.09),
            (0.08, 0.02, 0.09), (0.08, 0.08, 0.09), (0.02, 0.05, 0.09),
            (0.08, 0.05, 0.09), (0.01, 0.02, 0.05), (0.01, 0.08, 0.05),
            (0.01, 0.05, 0.02), (0.01, 0.05, 0.08), (0.09, 0.02, 0.05),
            (0.09, 0.08, 0.05), (0.09, 0.05, 0.02), (0.09, 0.05, 0.08)
        ]
        self.n_transducers = len(self.transducer_positions)

        # Each transducer can be ON (1) or OFF (0)
        self.action_space = spaces.MultiBinary(self.n_transducers)

        # Observations: flattened Nx×Ny slice
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(self.nx*self.ny,),
            dtype=np.float32
        )

        # Precompute the triangle mask
        self.triangle_mask = create_triangle_mask(
            self.nx, self.ny, self.L, self.H
        ).flatten()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        # Start with all transducers ON
        init_action = np.ones(self.n_transducers, dtype=int)
        p_sol = self.solve_acoustic_field(init_action)
        obs = self.sample_pressure_slice(p_sol)
        return obs, {}

    def step(self, action):
        # Solve PDE with ON/OFF transducers
        p_sol = self.solve_acoustic_field(action)
        obs = self.sample_pressure_slice(p_sol)
        reward = self.calculate_reward(obs)
        done = False
        info = {}
        return obs, reward, done, info

    def solve_acoustic_field(self, action):
        # 0 -> 0.0 amplitude, 1 -> 1e6 amplitude
        amplitudes = np.array(action, dtype=float) * 1e6

        u = TrialFunction(self.V)
        v = TestFunction(self.V)
        wave_expr = StaticTransducerWave(
            A=amplitudes,
            px=[p[0] for p in self.transducer_positions],
            py=[p[1] for p in self.transducer_positions],
            pz=[p[2] for p in self.transducer_positions],
            degree=2
        )

        a = inner(grad(u), grad(v)) * dx
        rhs = wave_expr * v * dx
        u_sol = Function(self.V)
        solve(a == rhs, u_sol, self.bc,
              solver_parameters={\"linear_solver\":\"gmres\",\"preconditioner\":\"ilu\"})
        return u_sol

    def sample_pressure_slice(self, u_sol, z_slice=0.05):
        x_vals = np.linspace(0, self.L, self.nx)
        y_vals = np.linspace(0, self.H, self.ny)
        P = np.zeros((self.ny, self.nx))
        for j, yy in enumerate(y_vals):
            for i, xx in enumerate(x_vals):
                P[j, i] = u_sol(xx, yy, z_slice)
        return P.flatten()

    def calculate_reward(self, pressure_slice):
        p_min, p_max = pressure_slice.min(), pressure_slice.max()
        # Normalize [0..1] to compare with mask
        if abs(p_max - p_min) < 1e-12:
            p_norm = np.zeros_like(pressure_slice)
        else:
            p_norm = (pressure_slice - p_min)/(p_max - p_min)
        mse = np.mean((p_norm - self.triangle_mask)**2)
        return -mse

# 4) Train and test PPO
env = AcousticEnv()
model = PPO('MlpPolicy', env, verbose=1)

print(\"Training PPO...this might take a while on a large mesh.\")
model.learn(total_timesteps=500)  # Use more steps for better results

print(\"\\nTesting final policy...\")
obs, _ = env.reset()
action, _states = model.predict(obs)
obs, reward, done, info = env.step(action)

print(\"Final action:\", action)
print(\"Final reward:\", reward)

# Visualize final pressure
Nx, Ny = env.nx, env.ny
final_pressure_2d = obs.reshape((Ny, Nx))

plt.figure(figsize=(6,5))
plt.imshow(final_pressure_2d, origin='lower',
           extent=[0, env.L, 0, env.H], aspect='equal')
plt.colorbar(label='Pressure (normalized scale)')
plt.title('Final Pressure Slice at z=0.05 m')
plt.xlabel('x [m]')
plt.ylabel('y [m]')
plt.tight_layout()
plt.show()


SyntaxError: unexpected character after line continuation character (2356489503.py, line 69)