In [1]:
from functools import wraps
from pathlib import Path
from typing import Union, List
import os
import json
from multiprocessing import Pool
from pprint import pprint

import numpy as np
import openfoamparser_mai as Ofpp
import pyvista
import h5py

In [2]:
def save_json(filename, data, save_path) -> None:
    """Cохраняет json"""
    file_path = save_path / Path(filename)
    with open(file_path, 'w', encoding="utf8") as f:
        json.dump(data, f)


def save_json_in_chunks(filename, data, save_path, chunk_size=1000):
    full_path = os.path.join(save_path, filename)
    with open(full_path, 'w') as file:
        file.write('[')
        for i, item in enumerate(data):
            json_str = json.dumps(item)
            file.write(json_str)
            if i < len(data) - 1:
                file.write(',\n')
            if i % chunk_size == 0 and i != 0:
                file.flush()  # Flush data to disk periodically
        file.write(']')


# The wrapper function for multiprocessing
def save_json_in_chunks_wrapper(args):
    save_json_in_chunks(*args)


def json_streaming_writer(filepath, data_func, data_args):
    """Write JSON data to a file using a generator to minimize memory usage."""
    data_gen = data_func(*data_args)
    try:
        with open(filepath, 'w') as file:
            print(f"writing {filepath}")
            file.write('[')
            for i, item in enumerate(data_gen):
                if i != 0:  # Add a comma before all but the first item
                    file.write(',')
                json.dump(item, file)
            file.write(']')
        print(f"Finished writing {filepath}")
    except Exception as e:
        print(f"Failed to write {filepath}: {str(e)}") 


def create_nodes_gen(mesh_bin):
    """Generator for nodes."""
    for point in mesh_bin.points:
        yield {
            'X': point[0],
            'Y': point[1],
            'Z': point[2]
        }


def create_faces_gen(mesh_bin):
    """Generator for faces."""
    for face in mesh_bin.faces:
        yield list(face)


def create_elements_gen(mesh_bin, p, u, c):
    """Generator for elements."""
    for i, cell in enumerate(mesh_bin.cell_faces):
        yield {
            'Faces': cell,
            'Pressure': p[i],
            'Velocity': {
                'X': u[i][0],
                'Y': u[i][1],
                'Z': u[i][2]
            },
            'VelocityModule': np.linalg.norm(u[i]),
            'Position': {
                'X': c[i][0],
                'Y': c[i][1],
                'Z': c[i][2]
            }
        }


def create_surfaces_gen(surfaces):
    """Generator for surfaces."""
    for surface in surfaces:
        yield surface


def _face_center_position(points: list, mesh: Ofpp.FoamMesh) -> list:
    vertecis = [mesh.points[p] for p in points]
    vertecis = np.array(vertecis)
    return list(vertecis.mean(axis=0))

In [3]:
def process_computational_domain(solver_path: Union[str, os.PathLike, Path],
                                 save_path: Union[str, os.PathLike, Path],
                                 p: np.ndarray,
                                 u: np.ndarray,
                                 c: np.ndarray,
                                 surface_name: str) -> None:
    """Сохранение геометрии расчетной области в виде json файла с полями:
    'Nodes' - List[x: float, y: float, z:float], 
    'Faces' - List [List[int]], 
    'Elements' - List [Dict{Faces: List[int],
                            Pressure: float,
                            Velocity: List[float],
                            VelocityModule: float,
                            Position: List[float]}
                            ], 
    'Surfaces' - List[
                    Tuple[Surface_name: str, 
                    List[Dict{ParentElementID: int,
                              ParentFaceId: int,
                              Position: List[float]}]
                    ]

    Args:
        solver_path (Union[str, os.PathLike, Path]): Путь до папки с расчетом.
        save_path (Union[str, os.PathLike, Path]): Путь для сохранения итогового json.
        p (np.ndarray): Поле давления.
        u (np.ndarray): Поле скоростей.
        c (np.ndarray): Центры ячеек.
        surface_name (str): Имя для поверхности.
    """
    
    # Step 0: parse mesh and scale vertices
    mesh_bin = Ofpp.FoamMesh(solver_path )

    # Step I: compute TFemFace_Surface
    domain_names = ["obstacle".encode('ascii')]
    surfaces = []

    for i, domain_name in enumerate(domain_names):
        bound_cells = list(mesh_bin.boundary_cells(domain_name))

        boundary_faces = []
        boundary_faces_cell_ids = []
        for bc_id in bound_cells:
            faces = mesh_bin.cell_faces[bc_id]
            for f in faces:
                if mesh_bin.is_face_on_boundary(f, domain_name):
                    boundary_faces.append(f)
                    boundary_faces_cell_ids.append(bc_id)

        f_b_set = set(zip(boundary_faces, boundary_faces_cell_ids))

        body_faces = []
        for f, b in f_b_set:
            try:
                position = _face_center_position(mesh_bin.faces[f], mesh_bin)
                d = {'ParentElementID': b,
                    'ParentFaceId': f,
                    'Position': {'X': position[0], 'Y': position[1], 'Z': position[2]}
                    }
                body_faces.append(d)
            except IndexError:
                print(f'Indexes for points: {f} is not valid!')

        surfaces.append({'Item1': surface_name,
                'Item2': body_faces}) 
    
    # Define file paths
    nodes_path = os.path.join(save_path, 'Nodes.json')
    faces_path = os.path.join(save_path, 'Faces.json')
    elements_path = os.path.join(save_path, 'Elements.json')
    surfaces_path = os.path.join(save_path, 'Surfaces.json')

    # Prepare arguments for the multiprocessing function
    
    tasks = [
    (nodes_path, create_nodes_gen, (mesh_bin,)),
    (faces_path, create_faces_gen, (mesh_bin,)),
    (elements_path, create_elements_gen, (mesh_bin, p, u, c)),
    (surfaces_path, create_surfaces_gen, (surfaces,))
        ]

    # Use multiprocessing pool
    with Pool() as pool:
        pool.starmap(json_streaming_writer, tasks)

Пример загрузки данных о поле давлений на поверхности тела в Numpy формате

In [4]:
def pressure_field_on_surface(solver_path: Union[str, os.PathLike, Path],
                                 p: np.ndarray,
                                 surface_name: str = 'Surface') -> None:
    """Поле давлений на поверхности тела:
    'Nodes' - List[x: float, y: float, z:float], 
    'Faces' - List [List[int]], 
    'Elements' - List [Dict{Faces: List[int],
                            Pressure: float,
                            Velocity: List[float],
                            VelocityModule: float,
                            Position: List[float]}
                            ], 
    'Surfaces' - List[
                    Tuple[Surface_name: str, 
                    List[Dict{ParentElementID: int,
                              ParentFaceId: int,
                              Position: List[float]}]
                    ]

    Args:
        solver_path (Union[str, os.PathLike, Path]): Путь до папки с расчетом.
        p (np.ndarray): Поле давления.
        surface_name (str): Имя для поверхности.
    """
    
    # Step 0: parse mesh and scale vertices
    mesh_bin = Ofpp.FoamMesh(solver_path )

    # Step I: compute TFemFace_Surface
    domain_names = ["obstacle".encode('ascii')]
    surfaces = []

    for i, domain_name in enumerate(domain_names):
        bound_cells = list(mesh_bin.boundary_cells(domain_name))

        boundary_faces = []
        boundary_faces_cell_ids = []
        for bc_id in bound_cells:
            faces = mesh_bin.cell_faces[bc_id]
            for f in faces:
                if mesh_bin.is_face_on_boundary(f, domain_name):
                    boundary_faces.append(f)
                    boundary_faces_cell_ids.append(bc_id)

        f_b_set = set(zip(boundary_faces, boundary_faces_cell_ids))

        body_faces = []
        for f, b in f_b_set:
            try:
                position = _face_center_position(mesh_bin.faces[f], mesh_bin)
                d = {'ParentElementID': b,
                    'ParentFaceId': f,
                    'CentrePosition': {'X': position[0], 'Y': position[1], 'Z': position[2]},
                    'PressureValue': p[b]
                    }
                body_faces.append(d)
            except IndexError:
                print(f'Indexes for points: {f} is not valid!')

        surfaces.append({'Item1': surface_name,
                'Item2': body_faces}) 
        

        return surfaces

In [12]:
PATH_TO_CASE = 'data_wage/low_dim/vel3.04040404040404'
END_TIME = '0.02'

In [13]:
base_path = Path(PATH_TO_CASE)
time_path = base_path / Path(END_TIME)
p_path = time_path / Path('p')
p = Ofpp.parse_internal_field(p_path)

In [14]:
surface = pressure_field_on_surface(base_path, p)

In [25]:
for s in surface[0]['Item2'][0]:
    pprint(s)
    break

'ParentElementID'


In [16]:
with h5py.File("first_h5.hdf5", "w") as f:
    dset = f.create_dataset("h5file", data = p, dtype = 'i')

In [32]:
surface[0]['Item2']

[{'ParentElementID': 34,
  'ParentFaceId': 169,
  'CentrePosition': {'X': -0.137178, 'Y': 0.04572, 'Z': -0.005},
  'PressureValue': 2.54682},
 {'ParentElementID': 27,
  'ParentFaceId': 162,
  'CentrePosition': {'X': 0.13716, 'Y': 0.036751911235000004, 'Z': 0.0},
  'PressureValue': 1.82475},
 {'ParentElementID': 33,
  'ParentFaceId': 168,
  'CentrePosition': {'X': -0.137178, 'Y': 0.01524, 'Z': -0.005},
  'PressureValue': 2.38182},
 {'ParentElementID': 26,
  'ParentFaceId': 161,
  'CentrePosition': {'X': 0.10668, 'Y': 0.02858481985, 'Z': 0.0},
  'PressureValue': 1.61031},
 {'ParentElementID': 28,
  'ParentFaceId': 163,
  'CentrePosition': {'X': 0.16763999999999998,
   'Y': 0.044919002619999995,
   'Z': 0.0},
  'PressureValue': 1.9403},
 {'ParentElementID': 32,
  'ParentFaceId': 167,
  'CentrePosition': {'X': 0.28956000000000004, 'Y': 0.07758736816, 'Z': 0.0},
  'PressureValue': 2.26735},
 {'ParentElementID': 25,
  'ParentFaceId': 160,
  'CentrePosition': {'X': 0.07619999999999999, 'Y': 0

In [51]:
x = surface[0]['Item2'][0]
sur_item2 =  surface[0]['Item2']

In [68]:

PEID = [x['ParentElementID'] for x in sur_item2]
PFID = [x['ParentElementID'] for x in sur_item2]
PV =   [x['PressureValue'] for x in sur_item2]
X = [x['CentrePosition']['X'] for x in sur_item2]
Y = [x['CentrePosition']['Y'] for x in sur_item2]
Z = [x['CentrePosition']['Z'] for x in sur_item2]


In [69]:
matr = [PEID, PFID, PV, X, Y, Z]

In [70]:
matr

[[34, 27, 33, 26, 28, 32, 25, 29, 31, 30],
 [34, 27, 33, 26, 28, 32, 25, 29, 31, 30],
 [2.54682,
  1.82475,
  2.38182,
  1.61031,
  1.9403,
  2.26735,
  1.27539,
  2.01852,
  2.17168,
  2.09109],
 [-0.137178,
  0.13716,
  -0.137178,
  0.10668,
  0.16763999999999998,
  0.28956000000000004,
  0.07619999999999999,
  0.19811999999999996,
  0.25908,
  0.2286],
 [0.04572,
  0.036751911235000004,
  0.01524,
  0.02858481985,
  0.044919002619999995,
  0.07758736816,
  0.020417728465,
  0.053086094005,
  0.069420276775,
  0.06125318539],
 [-0.005, 0.0, -0.005, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]

In [72]:

matrix1 = matr

hdf = h5py.File('test.hdf5', 'w')

ds1 = hdf.create_dataset('ds1', data=matrix1, dtype=np.float64)

ds1.attrs['ParentElementID'] = [PEID]
# без понятия что это
ds1.attrs['ParentFaceId'] = [PFID]
# давление
ds1.attrs['PressureValue'] = [PV]
# nxyz (nx ny nz)
ds1.attrs['nx'] = [X]
ds1.attrs['ny'] = [Y]
ds1.attrs['nz'] = [Z]

hdf.close()