In [1]:
import os
import re
import glob
import json
import concurrent.futures

import numpy as np
import pandas as pd
import pyvista as pv
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import gradio as gr

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
###############################################################################
#                             1) CONFIG & HELPERS
###############################################################################
def load_json_config(json_path="export_modified.json"):
    """Load the JSON config file and return as dict."""
    if not os.path.isfile(json_path):
        print(f"Warning: JSON config '{json_path}' not found. Returning empty dict.")
        return {}
    with open(json_path, "r") as f:
        return json.load(f)


def parse_time_from_folder(folder_name):
    """
    Attempt to parse a numeric 'time' or 'index' from the folder name,
    e.g. 'sim_100' -> 100. If parsing fails, returns 0.
    """
    match = re.search(r"(\d+)$", folder_name)
    if match:
        return float(match.group(1))
    else:
        return 0.0


def list_subfolders(root):
    """Return sorted list of subfolder paths under 'root'."""
    if not os.path.isdir(root):
        return []
    all_entries = [os.path.join(root, d) for d in os.listdir(root)]
    subdirs = [d for d in all_entries if os.path.isdir(d)]
    return sorted(subdirs)


In [3]:
###############################################################################
#       2) PARALLEL READING OF step_*.vtu FOR METRIC (TIME SERIES)
###############################################################################


def is_base_step_file(fn):
    """
    Return True if 'fn' is a 'base' step file (step_XX.vtu) and
    does NOT contain '_points', '_surf', '_contact', etc.
    """
    basename = os.path.basename(fn)
    if any(sub in basename for sub in ["_points", "_surf", "_contact"]):
        return False
    return True


def process_single_vtu_for_metric(filename):
    """
    Read a single .vtu file with PyVista, compute a "dummy" metric
    (the mean of the first point_data or cell_data array).
    Return (step_index, metric_value).
    """
    mesh = pv.read(filename)

    # Parse the step number from 'step_<n>.vtu'
    step_match = re.search(r"step_(\d+)\.vtu$", filename)
    if step_match:
        step_index = int(step_match.group(1))
    else:
        step_index = -1  # fallback

    # Dummy metric: If the mesh has point_data arrays, use the first one’s mean.
    # Otherwise, check cell_data. If none, 0.0
    if mesh.point_data:
        arr_name = list(mesh.point_data.keys())[0]
        metric_value = mesh.point_data[arr_name].mean()
    elif mesh.cell_data:
        arr_name = list(mesh.cell_data.keys())[0]
        metric_value = mesh.cell_data[arr_name].mean()
    else:
        metric_value = 0.0

    return step_index, metric_value


def read_all_steps_vtu_parallel_for_metric(directory):
    """
    Find all base step_*.vtu (excluding _points, _surf, etc.) and
    read them in parallel to build a DataFrame of 'step' vs. 'metric'.
    """
    pattern = os.path.join(directory, "step_*.vtu")
    all_vtu_files = glob.glob(pattern)
    base_vtu_files = sorted(f for f in all_vtu_files if is_base_step_file(f))

    if not base_vtu_files:
        return pd.DataFrame(columns=["step", "metric"])

    results = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {
            executor.submit(process_single_vtu_for_metric, fn): fn
            for fn in base_vtu_files
        }
        for future in concurrent.futures.as_completed(futures):
            fn = futures[future]
            try:
                step_idx, val = future.result()
                results.append((step_idx, val))
            except Exception as exc:
                print(f"Error reading {fn}: {exc}")

    # Sort by step index
    results.sort(key=lambda x: x[0])
    df = pd.DataFrame(results, columns=["step", "metric"])
    return df

In [4]:
###############################################################################
#        3) READ A SINGLE .vtu FOR GEOMETRY + DISPLACEMENT
###############################################################################
def pick_one_vtu_file_for_geometry(directory, step_number=0):
    """
    Attempt to find 'step_<step_number>.vtu' in directory, else fallback to first available.
    """
    specific = os.path.join(directory, f"step_{step_number}.vtu")
    if os.path.isfile(specific):
        return specific

    # fallback: pick the first base step file
    pattern = os.path.join(directory, "step_*.vtu")
    base_vtu_files = sorted(f for f in glob.glob(pattern) if is_base_step_file(f))
    if base_vtu_files:
        return base_vtu_files[0]
    return None


def read_vtu_for_geometry(filename):
    """
    Read a .vtu (via PyVista) to extract:
     - points
     - displacement (if 'solution' in point_data)
    Return (points, displacement).
    """
    if not filename or (not os.path.isfile(filename)):
        print(f"Cannot read geometry from: {filename}")
        return None, None

    mesh = pv.read(filename)
    points = mesh.points  # (N, 3)

    displacement = None
    if "solution" in mesh.point_data:
        displacement = mesh.point_data["solution"]

    return points, displacement


In [5]:
###############################################################################
#        4) READ STRESS CSV
###############################################################################
def read_stress_csv(csv_file):
    """
    Suppose columns are: sigma_xx, sigma_yy, sigma_zz, tau_xy, tau_yz, tau_zx
    If they are unnamed, rename them. Adjust as needed.
    """
    if not os.path.isfile(csv_file):
        return pd.DataFrame()

    # If whitespace-delimited or no header, adjust accordingly:
    # Example assumes whitespace-delimited with no header:
    df = pd.read_csv(csv_file, header=None, delim_whitespace=True)
    if df.shape[1] == 6:
        df.columns = ["sigma_xx", "sigma_yy", "sigma_zz", "tau_xy", "tau_yz", "tau_zx"]
    return df

In [6]:
###############################################################################
#        5) DISPLACE POINTS AND BUILD 3D SCATTER
###############################################################################


def displace_points(points, displacement):
    """Add displacement to each point. If displacement is None, return points as-is."""
    if displacement is None:
        return points
    return points + displacement


def build_3d_scatter_stress(displaced_points, df_stress):
    """
    Create a Plotly Scatter3d for the displaced geometry, colored by e.g. sigma_zz.
    If df_stress is empty or misaligned, we'll fallback to zeros.
    """
    n_points = displaced_points.shape[0]
    # Pick a stress component to plot
    if (
        not df_stress.empty
        and "sigma_zz" in df_stress.columns
        and len(df_stress) >= n_points
    ):
        stress_values = df_stress["sigma_zz"].values[:n_points]
    else:
        # fallback if no data or mismatch
        stress_values = np.zeros(n_points)

    # Build the scatter
    x, y, z = displaced_points[:, 0], displaced_points[:, 1], displaced_points[:, 2]
    scatter_3d = go.Scatter3d(
        x=x,
        y=y,
        z=z,
        mode="markers",
        marker=dict(
            size=3,
            color=stress_values,
            colorscale="Turbo",
            showscale=True,
            colorbar=dict(title="σ_zz"),
        ),
        name="Stress Heatmap",
    )
    return scatter_3d

In [7]:
###############################################################################
#        6) BUILD ONE FRAME (2D ENERGY & METRIC + 3D STRESS)
###############################################################################


def build_figure_for_folder(folder_path, config_data):
    """
    Read the necessary data from `folder_path` (energy.csv, step_*.vtu, stress_mat.csv),
    and build a single Plotly figure with subplots:
      - col=1: Energy vs i
      - col=2: Metric vs step
      - col=3: 3D scatter of the displaced geometry, colored by sigma_zz
    """

    # 1) Parse time/index from the folder name
    folder_name = os.path.basename(folder_path)
    sim_time = parse_time_from_folder(folder_name)

    # 2) Read time-series metric from step_*.vtu
    df_vtu = read_all_steps_vtu_parallel_for_metric(folder_path)

    # 3) Read energy.csv
    csv_energy = os.path.join(folder_path, "energy.csv")
    if os.path.isfile(csv_energy):
        df_energy = pd.read_csv(csv_energy)
    else:
        df_energy = pd.DataFrame()

    # 4) Geometry + displacement
    geom_file = pick_one_vtu_file_for_geometry(folder_path, step_number=0)
    points, displacement = read_vtu_for_geometry(geom_file)
    if points is None:
        # If no geometry, return a simple figure with a message
        fig = go.Figure()
        fig.add_annotation(text="No geometry found!", showarrow=False)
        fig.update_layout(title=f"Folder: {folder_name}")
        return fig

    displaced_points = displace_points(points, displacement)

    # 5) Stress
    stress_file = os.path.join(folder_path, "stress_mat.csv")
    df_stress = read_stress_csv(stress_file)
    scatter_3d = build_3d_scatter_stress(displaced_points, df_stress)

    # 6) Make the subplot figure
    fig = make_subplots(
        rows=1,
        cols=3,
        specs=[[{}, {}, {"type": "scene"}]],
        subplot_titles=["Energy CSV", "VTU Metric", "Stress Heatmap"],
        horizontal_spacing=0.07,
    )

    # (a) Energy lines in col=1
    if not df_energy.empty and "i" in df_energy.columns:
        numeric_cols = df_energy.select_dtypes(include=["number"]).columns
        numeric_cols = [c for c in numeric_cols if c != "i"]
        for col_name in numeric_cols:
            fig.add_trace(
                go.Scatter(
                    x=df_energy["i"],
                    y=df_energy[col_name],
                    mode="lines",
                    name=f"energy_{col_name}",
                ),
                row=1,
                col=1,
            )

    # (b) vtu metric in col=2
    if not df_vtu.empty:
        fig.add_trace(
            go.Scatter(
                x=df_vtu["step"],
                y=df_vtu["metric"],
                mode="lines+markers",
                name="VTU metric",
            ),
            row=1,
            col=2,
        )

    # (c) 3D scatter in col=3
    fig.add_trace(scatter_3d, row=1, col=3)

    fig.update_layout(
        width=1500,
        height=700,
        title=(
            f"Simulation: {folder_name} (time={sim_time})<br>"
            f"Units: L={config_data.get('units', {}).get('length', '?')} "
            f"M={config_data.get('units', {}).get('mass', '?')} "
            f"T={config_data.get('units', {}).get('time', '?')}"
        ),
    )
    return fig

In [8]:
###############################################################################
#        7) BUILD A SINGLE FIGURE WITH SUBPLOTS + FRAMES
###############################################################################


def build_figure_with_frames(all_data, config_data):
    """
    Make a figure with 1 row, 3 columns:
      col1: energy
      col2: metric
      col3: 3D scene
    Each simulation gets its own frame. We'll use a radio-button to switch frames.
    """

    # Prepare a subplot layout with a 3D scene in col=3
    fig = make_subplots(
        rows=1,
        cols=3,
        specs=[[{}, {}, {"type": "scene"}]],
        subplot_titles=["Energy CSV", "VTU Metric", "Stress Heatmap"],
        horizontal_spacing=0.07,
    )

    frames = []
    initial_data = []

    for idx, sim_info in enumerate(all_data):
        frame, frame_data = build_frame_for_sim(sim_info, idx)
        frames.append(frame)

        if idx == 0:
            # The first simulation is used as the default data
            initial_data = frame_data

    # Attach frames to the figure
    fig.frames = frames

    # Add the initial data to the figure in the correct subplot
    # We'll do a naive approach: the last item is the 3D scatter, everything else is 2D.
    for trace in initial_data[:-1]:
        # If the trace's x is presumably "df_energy['i']", we assume that's col=1,
        # otherwise it's the vtu metric => col=2
        # (You can customize your logic if you prefer more robust detection.)
        if "i[" in str(trace.x) or trace.name.endswith(": i"):
            fig.add_trace(trace, row=1, col=1)
        else:
            fig.add_trace(trace, row=1, col=2)

    # Then the last is 3D
    fig.add_trace(initial_data[-1], row=1, col=3)

    # Create update menus (radio or dropdown) to switch frames
    updatemenus = [
        dict(
            type="buttons",
            direction="down",  # or "right"
            x=1.25,
            y=0.5,
            showactive=True,
            buttons=[
                dict(
                    label=f"Sim #{idx + 1}",
                    method="animate",
                    args=[
                        [f"frame_{idx}"],
                        {
                            "mode": "immediate",
                            "frame": {"duration": 0, "redraw": True},
                            "transition": {"duration": 0},
                        },
                    ],
                )
                for idx in range(len(all_data))
            ],
        )
    ]

    fig.update_layout(
        updatemenus=updatemenus,
        title=(
            "Combined Simulations<br>"
            f"Config length: {config_data.get('units', {}).get('length', 'unknown')} | "
            f"mass: {config_data.get('units', {}).get('mass', 'unknown')} | "
            f"time: {config_data.get('units', {}).get('time', 'unknown')}<br>"
            "(Use the button to switch simulations.)"
        ),
        width=1500,
        height=700,
    )

    return fig


In [9]:
###############################################################################
#                          GRADIO INTERFACE LOGIC
###############################################################################


def show_folder_figure(selected_folder, config_file):
    """
    Callback for Gradio that, given a chosen folder path,
    builds the figure and returns its HTML as a string.
    """
    if not selected_folder:
        return "<p style='color:red;'>No folder selected.</p>"

    # Load config (or empty if not found)
    config_data = load_json_config(selected_folder + "/" + config_file)

    # Build the figure
    fig = build_figure_for_folder(selected_folder, config_data)

    # Return the HTML
    return fig.to_html(full_html=False, include_plotlyjs="cdn")


def generate_figure_html(output_root, config_path="export_modified.json"):
    """
    Execute all the steps, but instead of writing out a file, we return the
    Plotly HTML as a string for embedding in Gradio.
    """

    if not os.path.isdir(output_root):
        return (
            f"<p style='color:red;'>Error: Output folder '{output_root}' not found.</p>"
        )

    # 1) Load config
    config_data = load_json_config(config_path)

    # 2) Collect data from each subfolder
    all_data = []
    subfolders = sorted(os.listdir(output_root))
    for folder_name in subfolders:
        full_path = os.path.join(output_root, folder_name)
        if not os.path.isdir(full_path):
            continue

        print(f"--- Processing folder: {full_path} ---")

        # (A) Parse a "time" from the folder name (optional)
        sim_time = parse_time_from_folder(folder_name)

        # (B) Time-series metric: read all step_*.vtu in parallel
        df_vtu = read_all_steps_vtu_parallel_for_metric(full_path)

        # (C) Read energy.csv if present
        csv_energy = os.path.join(full_path, "energy.csv")
        if os.path.isfile(csv_energy):
            df_energy = pd.read_csv(csv_energy)
        else:
            df_energy = pd.DataFrame()

        # (D) Pick a single vtu for geometry
        geom_file = pick_one_vtu_file_for_geometry(full_path, step_number=0)
        points, displacement = read_vtu_for_geometry(geom_file)
        if points is None:
            # skip if no geometry
            continue
        displaced_points = displace_points(points, displacement)

        # (F) Read stress_mat.csv
        stress_file = os.path.join(full_path, "stress_mat.csv")
        df_stress = read_stress_csv(stress_file)

        # (G) Build 3D scatter
        scatter_3d = build_3d_scatter_stress(displaced_points, df_stress)

        sim_info = {
            "folder": folder_name,
            "sim_time": sim_time,
            "df_energy": df_energy,
            "df_vtu": df_vtu,
            "scatter_3d": scatter_3d,
        }
        all_data.append(sim_info)

    if not all_data:
        return "<p>No simulations found to plot.</p>"

    # 3) Build the figure
    fig = build_figure_with_frames(all_data, config_data)

    # 4) Return the figure as HTML (with PlotlyJS included from CDN)
    html_str = fig.to_html(full_html=False, include_plotlyjs="cdn")
    return html_str


def run_visualization(output_root, config_path="export_modified.json"):
    """
    This function is what Gradio will call upon the button click.
    It returns raw HTML, which will be displayed in a gr.HTML() component.
    """
    return generate_figure_html(output_root, config_path)

In [10]:
###############################################################################
#                                GRADIO APP
###############################################################################
with gr.Blocks(title="Combined Simulations - One Folder at a Time") as demo:
    gr.Markdown(
        r"""
        # Combined Simulations Viewer
        This Gradio app scans a root **output** folder for simulation subfolders, 
        then lets you pick one in a **radio** menu. For the chosen subfolder, 
        it shows:
        - **Left**: Energy vs. `i`  
        - **Middle**: VTU-derived metric vs. step  
        - **Right**: 3D scatter (displaced geometry) colored by \(\sigma_{zz}\).
        
        **Steps to Use**:
        1. Enter the path to your `output` folder (which contains `sim_1`, `sim_2`, etc.).
        2. (Optional) Enter the path to your `export_modified.json` config file.
        3. Click **Load** to populate the radio with available subfolders.
        4. Pick a folder from the radio to see the plot.
        """
    )

    with gr.Row():
        output_root = gr.Textbox(
            label="Output Root Folder",
            value=os.path.join(os.getcwd(), "output"),
            placeholder="Path containing subfolders with step_*.vtu, energy.csv, etc.",
        )
        config_file = gr.Textbox(
            label="Config JSON Path",
            value="export_modified.json",
            placeholder="Path to config JSON (optional)",
        )
        load_button = gr.Button("Load Subfolders")

    # 1) Show the list of subfolders in a radio
    folder_radio = gr.Radio(label="Available Subfolders", choices=[], interactive=True)

    # 2) Plot output
    out_html = gr.HTML()

    def load_subfolders(root_folder):
        subdirs = list_subfolders(root_folder)
        # Return them as the new "choices" for the radio
        return gr.update(
            choices=subdirs, value=None
        ), f"Found {len(subdirs)} subfolders."

    load_button.click(
        fn=load_subfolders,
        inputs=[output_root],
        outputs=[folder_radio, out_html],
        # we reset out_html with a status message
    )

    # On picking a folder in the radio, build the figure
    folder_radio.change(
        fn=show_folder_figure, inputs=[folder_radio, config_file], outputs=out_html
    )

if __name__ == "__main__":
    demo.launch()

* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.
