In [None]:
import os
import logging
import threading
from queue import Queue

from dolfin import *
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# from pyswarm import pso
import pyswarms as ps

In [None]:
# 初始化
set_log_active(False)
# 配置日志设置
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(console_handler)

# 网格大小
nelx, nely = 10, 10

# 二维正交各向异性
E1, E2, mu12, G12 = 130000, 7700, 0.33, 4800

# 载荷
forcing = Constant((0, -500))

# 弹性张量
C2D_Iso = np.array([
    [E1/(1-mu12**2),        mu12*E2/(1-mu12**2),         0  ],
    [mu12*E2/(1-mu12**2),   E2/(1-mu12**2),              0  ],
    [0,                     0,                           G12]
])

# 转轴公式
def T2D_inv(theta):
    theta = theta * np.pi / 180.0
    c = cos(theta)
    s = sin(theta)
    Trans = np.array([
        [c**2,  s**2,   -2*s*c],
        [s**2,  c**2,   2*s*c],
        [s*c,   -s*c,   c**2 - s**2]
    ])
    return Trans

# 准备网格、函数空间和初始化条件
mesh = UnitSquareMesh(nelx, nely)
V = VectorFunctionSpace(mesh, "CG", 1)
T = FunctionSpace(mesh, "DG", 0)
u_sol = Function(V)
Theta_sol = Function(T)
u_trial = TrialFunction(V)
v_test = TestFunction(V)

In [11]:
import pyvista as pv
from pyvista import examples

mesh = pv.Icosphere()
mesh2 = examples.load_nut()

pl = pv.Plotter()
pl.add_mesh(mesh2, style='wireframe', color='k', line_width=2)
pl.disable_anti_aliasing()
pl.camera.zoom(1.5)
pl.show()


Widget(value='<iframe src="http://localhost:45859/index.html?ui=P_0x7f38cc39bb00_10&reconnect=auto" class="pyv…

In [None]:
# 物理方程
def epsilon(u):
    engineering_strain = 0.5 * (nabla_grad(u) + nabla_grad(u).T)
    return engineering_strain

# 本构方程
def sigma_tensor(u, Theta_Ev):
    epsilon_ij = epsilon(u)
    ep = as_vector([epsilon_ij[0,0], epsilon_ij[1,1], epsilon_ij[0,1]])
    Q_bar = np.dot(np.dot(T2D_inv(Theta_Ev), C2D_Iso), T2D_inv(Theta_Ev).T)
    sigma_ij = np.dot(Q_bar, ep)

    return as_tensor([[sigma_ij[0], sigma_ij[2]],
                      [sigma_ij[2], sigma_ij[1]]])

# 弹性能密度
def psi(u, Theta):
    return 0.5 * inner(sigma_tensor(u, Theta), epsilon(u))

In [None]:
# 边界定义
def clamped_boundary(x, on_boundary):
    return on_boundary and x[0] < DOLFIN_EPS

bc = DirichletBC(V, Constant((0.0, 0.0)), clamped_boundary)

class RightEnd(SubDomain):
    def inside(self, x, on_boundary):
        return on_boundary and abs(x[0] - 1) < DOLFIN_EPS and (abs(x[1] - 0.5) < 0.5)
right_end_boundary = RightEnd()

class TopEnd(SubDomain):
    def inside(self, x, on_boundary):
        return on_boundary and abs(x[0] - 0.5) < 0.1 and (abs(x[1] - 1) < DOLFIN_EPS)
top_end_boundary = TopEnd()

boundary_mark = MeshFunction("size_t", mesh, mesh.topology().dim()-1)
boundary_mark.set_all(0)
right_end_boundary.mark(boundary_mark, 1)
top_end_boundary.mark(boundary_mark, 2)

In [None]:
# 有限元求解
def FEA(V:VectorFunctionSpace, lhs, rhs, u:Function, bc:DirichletBC) -> Function:
    problem = LinearVariationalProblem(lhs, rhs, u, bc)
    solver = LinearVariationalSolver(problem)
    return solver.solve()

In [None]:
# 适应度函数
energy = []
fitness_history = []
theta_values_history = []

def fitness_func(solution):
    theta_values = solution.reshape(T.dim())
    theta_func = Function(T)
    theta_func.vector()[:] = theta_values
    Theta_sol.assign(theta_func)
    lhs = inner(sigma_tensor(u_trial, Theta_sol), nabla_grad(v_test)) * dx
    rhs = dot(forcing, v_test) * ds(subdomain_data=boundary_mark, domain=mesh, subdomain_id=1)
    FEA(V, lhs, rhs, u_sol, bc)
    energy_density = assemble(psi(u_sol, Theta_sol) * dx)
    energy.append(energy_density)
    
    smoothness_penalty = 0
    for cell in cells(mesh):
        cell_index = cell.index()
        cell_theta = theta_values[cell_index]
        for facet in facets(cell):
            neighbor_cells = facet.entities(2)
            for neighbor in neighbor_cells:
                if neighbor < T.dim() and neighbor != cell_index:
                    neighbor_theta = theta_values[neighbor]
                    angle_diff = cell_theta - neighbor_theta
                    smoothness_penalty += angle_diff ** 2

    smoothness_penalty /= mesh.num_cells()

    fitness = energy_density + 100 * smoothness_penalty
    fitness_history.append(fitness)
    theta_values_history.append(theta_values.copy())
    return fitness

In [None]:
# PSO参数
lb = [-90] * T.dim()
ub = [90] * T.dim()
swarm_size = 50
max_iter = 100

# 运行PSO并记录迭代次数
def iteration_callback(iteration, best_fitness):
    logger.info(f"Iteration: {iteration}, Best Fitness: {best_fitness}", end='\r')

# 动画展示
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))

# 初始化适应度值变化图
line1, = ax1.plot([], [], 'b-')
ax1.set_xlim(0, max_iter)
ax1.set_ylim(0, max(fitness_history) if fitness_history else 1)
ax1.set_xlabel("Iteration")
ax1.set_ylabel("Fitness Value")
ax1.set_title("Fitness Value Over Iterations")

# 初始化向量场图
theta_rad = np.deg2rad(theta_values_history[0]) if theta_values_history else np.zeros(T.dim())
x_values = np.cos(theta_rad)
y_values = np.sin(theta_rad)
vector_values = np.vstack((x_values, y_values)).T
cell_midpoints = [cell.midpoint() for cell in cells(mesh)]
x_coords = np.array([p.x() for p in cell_midpoints])
y_coords = np.array([p.y() for p in cell_midpoints])
u = vector_values[:, 0]
v = vector_values[:, 1]
quiver = ax2.quiver(x_coords, y_coords, u, v)
ax2.set_title("Vector Field")
ax2.set_xlabel("x")
ax2.set_ylabel("y")
ax2.axis("equal")

def init():
    line1.set_data([], [])
    quiver.set_UVC(u, v)
    return line1, quiver

def update(frame):
    line1.set_data(range(frame+1), fitness_history[:frame+1])
    
    theta_rad = np.deg2rad(theta_values_history[frame])
    x_values = np.cos(theta_rad)
    y_values = np.sin(theta_rad)
    vector_values = np.vstack((x_values, y_values)).T
    u = vector_values[:, 0]
    v = vector_values[:, 1]
    quiver.set_UVC(u, v)
    
    return line1, quiver

def run_pso():
    global best_solution, best_fitness
    best_solution, best_fitness = pso(fitness_func, lb, ub, swarmsize=swarm_size, maxiter=max_iter)
    print(f"Best Solution: {best_solution}")
    print(f"Best Fitness: {best_fitness}")

# 使用线程运行PSO
pso_thread = threading.Thread(target=run_pso)
pso_thread.start()

ani = FuncAnimation(fig, update, frames=lambda: len(fitness_history), init_func=init, blit=True, repeat=False, interval=100)
plt.show()

# 等待PSO完成
pso_thread.join()

# 优化结果可视化
plt.figure()
plt.plot(fitness_history, label='Fitness Energy')
plt.legend()
plt.show()

plt.figure()
plt.plot(np.array(theta_values_history))
plt.legend(['Theta ' + str(i) for i in range(len(theta_values_history[0]))])
plt.show()