In [None]:
import numpy as np
from scipy.sparse import lil_matrix, csr_matrix
from scipy.sparse.linalg import spsolve
# from numba import jit  # 移除 @jit
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
from functools import partial

# 建议：将单元计算相关的代码放入一个类中，使结构更清晰

def compute_element_data(node_coords, xi, eta):
    """计算单元的 B 矩阵等, 用于刚度矩阵和应力计算"""
    dN_dxi = 0.25 * np.array([
        [eta - 1, 1 - eta, 1 + eta, -eta - 1],
        [xi - 1, -xi - 1, xi + 1, 1 - xi]
    ])
    J = dN_dxi @ node_coords
    detJ = J[0, 0] * J[1, 1] - J[0, 1] * J[1, 0]
    invJ = np.array([[J[1, 1], -J[0, 1]], [-J[1, 0], J[0, 0]]]) / detJ  # 2x2 矩阵的逆
    dN_dx = invJ @ dN_dxi
    B = np.zeros((3, 8))
    for j in range(4):
        B[0, 2 * j] = dN_dx[0, j]
        B[1, 2 * j + 1] = dN_dx[1, j]
        B[2, 2 * j] = dN_dx[1, j]
        B[2, 2 * j + 1] = dN_dx[0, j]
    return B, detJ


def compute_element_matrix(node_coords, E_e, nu, thickness):
    """计算单元刚度矩阵"""
    D = E_e / (1 - nu ** 2) * np.array([[1, nu, 0],
                                         [nu, 1, 0],
                                         [0, 0, (1 - nu) / 2]])

    gauss_points = np.array([(-0.57735026919, -0.57735026919),
                             (0.57735026919, -0.57735026919),
                             (0.57735026919, 0.57735026919),
                             (-0.57735026919, 0.57735026919)])
    weights = np.array([1.0, 1.0, 1.0, 1.0])

    Ke = np.zeros((8, 8))
    for i in range(4):
        xi, eta = gauss_points[i]
        w = weights[i]
        B, detJ = compute_element_data(node_coords, xi, eta) # 调用
        Ke += (B.T @ D @ B) * detJ * w * thickness

    return Ke

def process_element_batch(args):
    """处理一批元素的计算"""
    start_idx, end_idx, elements, nodes, E_values, nu, thickness = args
    results = []
    for i in range(start_idx, end_idx):
        elem = elements[i]
        node_coords = nodes[elem]
        Ke = compute_element_matrix(node_coords, E_values[i], nu, thickness)
        dofs = np.array([[2 * n, 2 * n + 1] for n in elem]).flatten()
        results.append((dofs, Ke))
    return results

def compute_stress_batch(args):
    """并行计算应力"""
    start_idx, end_idx, elements, nodes, U, E_values, nu = args
    stress_values = np.zeros(end_idx - start_idx)

    for local_idx, idx in enumerate(range(start_idx, end_idx)):
        elem = elements[idx]
        node_coords = nodes[elem]
        # u = np.concatenate([U[2*elem[i]:2*elem[i]+2] for i in range(4)]) # 更简洁的写法
        dofs = np.array([[2 * n, 2 * n + 1] for n in elem]).flatten()
        u = U[dofs]


        B, _ = compute_element_data(node_coords, 0.0, 0.0)  # 调用, 中心点应力
        strain = B @ u
        E_e = E_values[idx]
        D = E_e / (1 - nu ** 2) * np.array([[1, nu, 0],
                                             [nu, 1, 0],
                                             [0, 0, (1 - nu) / 2]])
        stress = D @ strain

        s_x, s_y, t_xy = stress
        avg = (s_x + s_y) / 2
        radius = np.sqrt(((s_x - s_y) / 2) ** 2 + t_xy ** 2)
        stress_values[local_idx] = avg + radius

    return stress_values


def main():
    # 参数设置
    nx, ny = 50, 100
    width, height = 50.0, 100.0
    E = 210e3
    nu = 0.3
    thickness = 1.0
    applied_stress = -6.0
    crack_x = (12, 13)
    crack_y = (40, 60)

    # 生成网格
    x = np.linspace(0, width, nx + 1)
    y = np.linspace(0, height, ny + 1)
    X, Y = np.meshgrid(x, y)
    nodes = np.column_stack((X.flatten(), Y.flatten()))

    # 生成单元连接性
    i = np.arange(nx * ny)
    row = i // nx
    col = i % nx
    n0 = row * (nx + 1) + col
    elements = np.column_stack((n0, n0 + 1, n0 + nx + 2, n0 + nx + 1))

    # 标记裂纹区域
    element_centers = np.mean(nodes[elements], axis=1)
    crack_mask = ((crack_x[0] <= element_centers[:, 0]) &
                  (element_centers[:, 0] <= crack_x[1]) &
                  (crack_y[0] <= element_centers[:, 1]) &
                  (element_centers[:, 1] <= crack_y[1]))
    E_values = np.where(crack_mask, 1e-6, E)

    # 并行组装刚度矩阵
    dof = 2 * len(nodes)
    K = lil_matrix((dof, dof))

    num_processes = cpu_count() # 使用所有核心
    chunk_size = len(elements) // num_processes

    with Pool(processes=num_processes) as pool:
        batches = []
        for i in range(num_processes):
            start_idx = i * chunk_size
            end_idx = start_idx + chunk_size if i < num_processes - 1 else len(elements)
            batch = (start_idx, end_idx, elements, nodes, E_values, nu, thickness)
            batches.append(batch)

        results = pool.map(process_element_batch, batches)

    # 组装全局矩阵
    for batch_results in results:
        for dofs, Ke in batch_results:
            for i in range(8):
                for j in range(8):
                    K[dofs[i], dofs[j]] += Ke[i, j]

    # 边界条件
    bottom_nodes = np.where(nodes[:, 1] == 0.0)[0]
    fixed_dofs = np.concatenate([2 * bottom_nodes, 2 * bottom_nodes + 1])

    # 载荷向量
    F = np.zeros(dof)
    top_nodes = np.where(nodes[:, 1] == height)[0]
    force_per_node = applied_stress * width * thickness / len(top_nodes)
    F[2 * top_nodes + 1] = force_per_node

    # 处理边界条件
    for fd in fixed_dofs:
        F[fd] = 0.0
        K[fd, :] = 0
        K[:, fd] = 0
        K[fd, fd] = 1.0

    # 转换为CSR格式并求解
    K = csr_matrix(K)
    U = spsolve(K, F)

    # 并行计算应力
    with Pool(processes=num_processes) as pool:
        stress_batches = []
        for i in range(num_processes):
            start_idx = i * chunk_size
            end_idx = start_idx + chunk_size if i < num_processes - 1 else len(elements)
            batch = (start_idx, end_idx, elements, nodes, U, E_values, nu)
            stress_batches.append(batch)

        stress_results = pool.map(compute_stress_batch, stress_batches)

    # 合并应力结果
    stress_values = np.concatenate(stress_results)
    stress_matrix = stress_values.reshape(ny, nx)

    # 绘图
    plt.figure(figsize=(10, 8))
    plt.imshow(stress_matrix, cmap='jet', extent=[0, width, 0, height], origin='lower')  # origin='lower' 调整坐标系
    plt.colorbar(label='Maximum Principal Stress (MPa)')
    plt.xlabel('Width (mm)')
    plt.ylabel('Height (mm)')
    plt.title('Maximum Principal Stress Distribution')
    plt.show()


if __name__ == "__main__":
    main()

In [None]:
import numpy as np
from scipy.sparse import coo_matrix, csr_matrix
from scipy.sparse.linalg import spsolve
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
from functools import partial

def compute_element_data(node_coords, xi, eta):
    """计算单元的 B 矩阵等"""
    dN_dxi = 0.25 * np.array([
        [eta - 1, 1 - eta, 1 + eta, -eta - 1],
        [xi - 1, -xi - 1, xi + 1, 1 - xi]
    ])
    J = dN_dxi @ node_coords
    detJ = J[0, 0] * J[1, 1] - J[0, 1] * J[1, 0]
    invJ = np.array([[J[1, 1], -J[0, 1]], [-J[1, 0], J[0, 0]]]) / detJ
    dN_dx = invJ @ dN_dxi
    B = np.zeros((3, 8))
    for j in range(4):
        B[0, 2*j] = dN_dx[0, j]
        B[1, 2*j + 1] = dN_dx[1, j]
        B[2, 2*j] = dN_dx[1, j]
        B[2, 2*j + 1] = dN_dx[0, j]
    return B, detJ

def compute_element_matrix(node_coords, E_e, nu, thickness):
    """计算单元刚度矩阵"""
    D = E_e / (1 - nu**2) * np.array([
        [1, nu, 0],
        [nu, 1, 0],
        [0, 0, (1-nu)/2]
    ])
    
    gauss_points = np.array([
        (-0.57735026919, -0.57735026919),
        (0.57735026919, -0.57735026919),
        (0.57735026919, 0.57735026919),
        (-0.57735026919, 0.57735026919)
    ])
    
    Ke = np.zeros((8, 8))
    for xi, eta in gauss_points:
        B, detJ = compute_element_data(node_coords, xi, eta)
        Ke += B.T @ D @ B * detJ * thickness
    
    return Ke

def process_element_batch(args):
    """处理一批元素的计算，返回COO格式数据"""
    start_idx, end_idx, elements, nodes, E_values, nu, thickness = args
    rows, cols, data = [], [], []
    
    for i in range(start_idx, end_idx):
        elem = elements[i]
        node_coords = nodes[elem]
        Ke = compute_element_matrix(node_coords, E_values[i], nu, thickness)
        dofs = np.array([[2*n, 2*n+1] for n in elem]).flatten()
        
        for i_local in range(8):
            for j_local in range(8):
                rows.append(dofs[i_local])
                cols.append(dofs[j_local])
                data.append(Ke[i_local, j_local])
                
    return np.array(rows), np.array(cols), np.array(data)

def compute_stress_batch(args):
    """并行计算应力"""
    start_idx, end_idx, elements, nodes, U, E_values, nu = args
    stress_values = np.zeros(end_idx - start_idx)
    
    D_base = np.array([[1, nu, 0],
                       [nu, 1, 0],
                       [0, 0, (1-nu)/2]])
    
    for local_idx, idx in enumerate(range(start_idx, end_idx)):
        elem = elements[idx]
        node_coords = nodes[elem]
        dofs = np.array([[2*n, 2*n+1] for n in elem]).flatten()
        u = U[dofs]
        
        B, _ = compute_element_data(node_coords, 0.0, 0.0)
        strain = B @ u
        D = E_values[idx] / (1 - nu**2) * D_base
        stress = D @ strain
        
        s_x, s_y, t_xy = stress
        avg = (s_x + s_y) / 2
        radius = np.sqrt(((s_x - s_y) / 2)**2 + t_xy**2)
        stress_values[local_idx] = avg + radius
        
    return stress_values

def main():
    # 参数设置
    nx, ny = 50, 100
    width, height = 50.0, 100.0
    E = 210e3
    nu = 0.3
    thickness = 1.0
    applied_stress = -6.0
    crack_x = (12, 13)
    crack_y = (40, 60)

    # 生成网格
    x = np.linspace(0, width, nx + 1)
    y = np.linspace(0, height, ny + 1)
    X, Y = np.meshgrid(x, y)
    nodes = np.column_stack((X.flatten(), Y.flatten()))

    # 生成单元
    i = np.arange(nx * ny)
    row = i // nx
    col = i % nx
    n0 = row * (nx + 1) + col
    elements = np.column_stack((n0, n0 + 1, n0 + nx + 2, n0 + nx + 1))

    # 定义裂纹
    element_centers = np.mean(nodes[elements], axis=1)
    crack_mask = ((crack_x[0] <= element_centers[:, 0]) &
                 (element_centers[:, 0] <= crack_x[1]) &
                 (crack_y[0] <= element_centers[:, 1]) &
                 (crack_y[1] >= element_centers[:, 1]))
    E_values = np.where(crack_mask, E * 1e-6, E)

    # 并行组装刚度矩阵
    dof = 2 * len(nodes)
    num_processes = cpu_count()
    chunk_size = len(elements) // num_processes

    rows, cols, data = [], [], []
    with Pool(processes=num_processes) as pool:
        batches = []
        for i in range(num_processes):
            start_idx = i * chunk_size
            end_idx = start_idx + chunk_size if i < num_processes - 1 else len(elements)
            batch = (start_idx, end_idx, elements, nodes, E_values, nu, thickness)
            batches.append(batch)

        results = pool.map(process_element_batch, batches)
        
        for batch_rows, batch_cols, batch_data in results:
            rows.extend(batch_rows)
            cols.extend(batch_cols)
            data.extend(batch_data)

    # 使用COO格式组装全局矩阵
    K = coo_matrix((data, (rows, cols)), shape=(dof, dof)).tocsr()

    # 边界条件
    bottom_nodes = np.where(nodes[:, 1] == 0.0)[0]
    fixed_dofs = np.concatenate([2 * bottom_nodes, 2 * bottom_nodes + 1])

    # 载荷向量
    F = np.zeros(dof)
    top_nodes = np.where(nodes[:, 1] == height)[0]
    force_per_node = applied_stress * width * thickness / len(top_nodes)
    F[2 * top_nodes + 1] = force_per_node

    # 优化边界条件处理
    mask = np.ones(dof, dtype=bool)
    mask[fixed_dofs] = False
    K_red = K[mask][:, mask]
    F_red = F[mask]

    # 求解
    U_red = spsolve(K_red, F_red)
    U = np.zeros(dof)
    U[mask] = U_red

    # 并行计算应力
    with Pool(processes=num_processes) as pool:
        stress_batches = []
        for i in range(num_processes):
            start_idx = i * chunk_size
            end_idx = start_idx + chunk_size if i < num_processes - 1 else len(elements)
            batch = (start_idx, end_idx, elements, nodes, U, E_values, nu)
            stress_batches.append(batch)

        stress_results = pool.map(compute_stress_batch, stress_batches)

    stress_values = np.concatenate(stress_results)
    stress_matrix = stress_values.reshape(ny, nx)

    # 绘图
    plt.figure(figsize=(10, 8))
    plt.imshow(stress_matrix, cmap='jet', extent=[0, width, 0, height], origin='lower')
    plt.colorbar(label='Maximum Principal Stress (MPa)')
    plt.xlabel('Width (mm)')
    plt.ylabel('Height (mm)')
    plt.title('Maximum Principal Stress Distribution')
    plt.show()

if __name__ == "__main__":
    main()