In [1]:
import vtk
from vtk.util.numpy_support import vtk_to_numpy


def rk4_integrate(probe_filter, point, step_size):
    def get_vector_at_point(point):
        # Set up a vtkPoints object and insert the current point
        vtk_point = vtk.vtkPoints()
        vtk_point.InsertNextPoint(point)

        # Create a vtkPolyData and set its points to the vtkPoints object
        polydata = vtk.vtkPolyData()
        polydata.SetPoints(vtk_point)

        # Set the input for the probe filter
        probe_filter.SetInputData(polydata)

        # Update the probe filter to interpolate the vector at the point
        probe_filter.Update()

        # Get the interpolated vector
        probed_data = probe_filter.GetOutput()
        vectors = probed_data.GetPointData().GetVectors()
        if vectors is not None:
            return vtk_to_numpy(vectors)[0]
        else:
            return None

    # Compute the k terms
    k1 = get_vector_at_point(point)
    if k1 is None:
        return None

    k2 = get_vector_at_point(
        [p + 0.5 * step_size * k for p, k in zip(point, k1)])
    if k2 is None:
        return None

    k3 = get_vector_at_point(
        [p + 0.5 * step_size * k for p, k in zip(point, k2)])
    if k3 is None:
        return None

    k4 = get_vector_at_point([p + step_size * k for p, k in zip(point, k3)])
    if k4 is None:
        return None

    # Calculate the next position based on the k terms
    next_point = [p + step_size * (k1i + 2 * k2i + 2 * k3i + k4i) / 6.0
                  for p, k1i, k2i, k3i, k4i in zip(point, k1, k2, k3, k4)]

    return next_point


def within_bounds(point, bounds):
    return all(bounds[i] <= point[i//2] <= bounds[i+1] for i in range(0, len(bounds), 2))


def trace_streamline(seed_point, probe_filter, bounds, step_size=0.05, max_steps=1000):
    streamline_points = [seed_point]
    current_point = seed_point

    # Forward integration
    for _ in range(max_steps):
        next_point = rk4_integrate(probe_filter, current_point, step_size)
        if next_point is None or not within_bounds(next_point, bounds):
            break
        streamline_points.append(next_point)
        current_point = next_point

    current_point = seed_point
    # Backward integration
    for _ in range(max_steps):
        next_point = rk4_integrate(probe_filter, current_point, -step_size)
        if next_point is None or not within_bounds(next_point, bounds):
            break
        streamline_points.insert(0, next_point)
        current_point = next_point

    return streamline_points


# Prepare the VTK objects
reader = vtk.vtkXMLImageDataReader()
reader.SetFileName("tornado3d_vector.vti")
reader.Update()
vector_field = reader.GetOutput()

probe_filter = vtk.vtkProbeFilter()
probe_filter.SetSourceData(vector_field)

# seed_point = (0, 0, 7)
seed_point = list(map(float, input("Enter the 3D seed location (x,y,z): ").split(',')))
bounds = vector_field.GetBounds()

# Perform the streamline tracing
streamline = trace_streamline(seed_point, probe_filter, bounds)

# Convert the streamline points to VTK objects and save
points_vtk = vtk.vtkPoints()
lines_vtk = vtk.vtkCellArray()

for i, point in enumerate(streamline):
    points_vtk.InsertNextPoint(point)
    if i > 0:
        line = vtk.vtkLine()
        line.GetPointIds().SetId(0, i - 1)
        line.GetPointIds().SetId(1, i)
        lines_vtk.InsertNextCell(line)

polydata = vtk.vtkPolyData()
polydata.SetPoints(points_vtk)
polydata.SetLines(lines_vtk)

writer = vtk.vtkXMLPolyDataWriter()
writer.SetFileName("streamline.vtp")
writer.SetInputData(polydata)
writer.Write()

print("Streamline written to 'streamline.vtp'")

Streamline written to 'streamline.vtp'


In [1]:
import vtk
from vtk.util.numpy_support import vtk_to_numpy


def compute_vector_at_location(probe, location):
    # Create a vtkPoints instance and insert the location
    vtk_pts = vtk.vtkPoints()
    vtk_pts.InsertNextPoint(location)

    # Establish a vtkPolyData object with the points
    poly_data = vtk.vtkPolyData()
    poly_data.SetPoints(vtk_pts)

    # Assign the poly data to the probe
    probe.SetInputData(poly_data)
    probe.Update()

    # Extract the vector data from the probe's output
    output_data = probe.GetOutput()
    vector_data = output_data.GetPointData().GetVectors()
    return vtk_to_numpy(vector_data)[0] if vector_data else None


def runge_kutta_integration(probe, start, h):
    # Initial vector calculation
    initial_vector = compute_vector_at_location(probe, start)
    if initial_vector is None:
        return None

    intermediate_vectors = []
    for i in range(1, 4):
        factor = 0.5 if i < 3 else 1
        point = [x + factor * h * v for x,
                 v in zip(start, intermediate_vectors[-1] if i > 1 else initial_vector)]
        vector = compute_vector_at_location(probe, point)
        if vector is None:
            return None
        intermediate_vectors.append(vector)

    # Combine the vectors to calculate the next position
    k1, k2, k3, k4 = initial_vector, *intermediate_vectors
    final_position = [x + h * (k1 + 2*k2 + 2*k3 + k4) /
                      6 for x, k1, k2, k3, k4 in zip(start, k1, k2, k3, k4)]
    return final_position


def is_within_limits(point, limit):
    return all(limit[i] <= point[i // 2] <= limit[i + 1] for i in range(0, len(limit), 2))


def generate_streamline(initial_point, probe, limit, h=0.05, max_iter=1000):
    points = [initial_point]
    next_point = initial_point

    # Forward tracing
    for _ in range(max_iter):
        next_point = runge_kutta_integration(probe, next_point, h)
        if next_point is None or not is_within_limits(next_point, limit):
            break
        points.append(next_point)

    # Reset for backward tracing
    next_point = initial_point
    for _ in range(max_iter):
        next_point = runge_kutta_integration(probe, next_point, -h)
        if next_point is None or not is_within_limits(next_point, limit):
            break
        points.insert(0, next_point)

    return points


# Setup VTK for reading and probing data
data_reader = vtk.vtkXMLImageDataReader()
data_reader.SetFileName("tornado3d_vector.vti")
data_reader.Update()
field_data = data_reader.GetOutput()

probe = vtk.vtkProbeFilter()
probe.SetSourceData(field_data)

seed = list(map(float, input("Enter the 3D seed location (x,y,z): ").split(',')))
boundary = field_data.GetBounds()

# Streamline computation
path = generate_streamline(seed, probe, boundary)

# Save the computed streamline to a VTK file
streamline_points = vtk.vtkPoints()
streamline_lines = vtk.vtkCellArray()

for index, coord in enumerate(path):
    streamline_points.InsertNextPoint(coord)
    if index > 0:
        conn_line = vtk.vtkLine()
        conn_line.GetPointIds().SetId(0, index - 1)
        conn_line.GetPointIds().SetId(1, index)
        streamline_lines.InsertNextCell(conn_line)

streamline_data = vtk.vtkPolyData()
streamline_data.SetPoints(streamline_points)
streamline_data.SetLines(streamline_lines)

output_writer = vtk.vtkXMLPolyDataWriter()
output_writer.SetFileName("streamline.vtp")
output_writer.SetInputData(streamline_data)
output_writer.Write()

print("Streamline successfully saved to 'streamline.vtp'")

Streamline successfully saved to 'streamline.vtp'
