This is the first tutorial providded for the PCA package. In it, the LD based chaos indicators are calcualted for the non-dimensional double pendulum using an adaptive step ODE DOP 8(5) solver.


In [1]:
# Import libraries
import numpy as np
from numba import cuda, float64
import math
import time
import sys
import os
try:
    from chaoticus import (create_solver_kernel_adaptive,
                                 neigh_gen,
                                 chaos_indicators)
except ImportError:
    print("Error: Could not import functions from 'chaoticus.py'.")
    print("Make sure the file exists and is in the correct path.")
    sys.exit(1)

The Hamiltonian system we will use is the dimensionless double pendulum, whose Hamiltonian is given by:


$$
    \mathcal{H}(\boldsymbol{\theta},\mathbf{p}) = \mathcal{T} + \mathcal{V} = \dfrac{1}{2} \mathbf{p}^T M^{-1}(\cos \Delta \theta) \, \mathbf{p} - \alpha(1+\sigma) \cos \theta_1 - \cos \theta_2
$$

where $\alpha = \dfrac{l_1}{l_2}$, $\sigma = \dfrac{m_1}{m_2}$, $\Delta \theta = \theta_1 - \theta_2$ and:

$$
    M^{-1}(x) = \dfrac{1}{1+\sigma - x^2}\begin{bmatrix}
        \dfrac{1}{\alpha^2} & -\dfrac{x}{\alpha} \\[.35cm]
        -\dfrac{x}{\alpha} & 1+\sigma
        \end{bmatrix}
$$

Then, Hamilton's equations of motion are:

$$
    \begin{cases}
        \dot{\boldsymbol{\theta}} = M^{-1}(\cos \Delta \theta) \, \mathbf{p} \\ 
        \dot{\mathbf{p}} = \dfrac{\sin \Delta \theta}{2} \mathbf{p}^T C(\cos \Delta \theta) \, \mathbf{p} \begin{bmatrix}
            1 \\
            -1
        \end{bmatrix} - \begin{bmatrix}
        \alpha(1+\sigma) \sin\theta_1 \\[.1cm]
        \sin\theta_2
        \end{bmatrix}
    \end{cases} 
$$

In [2]:
"""
Define the potential energy function
"""
def V(theta1, theta2, a, s):
    return - a * (s + 1) * np.cos(theta1) - np.cos(theta2)

"""
Define system Hamiltonian for CPU usage
"""
def H_cpu(theta1, theta2, p1, p2, a, s):

    delta = np.cos(theta1 - theta2)
    c = 1 + s - delta**2
    B_inv_matrix = np.array([
        [1 / (a**2 * c), -delta / (a * c)],
        [-delta / (a * c), (1 + s) / c]
    ])
    p = np.array([p1, p2])
    kinetic_energy = 0.5 * (p[0] * (B_inv_matrix[0, 0] * p[0] + B_inv_matrix[0, 1] * p[1]) +
                           p[1] * (B_inv_matrix[1, 0] * p[0] + B_inv_matrix[1, 1] * p[1]))
    potential_energy = V(theta1, theta2, a, s)
    return kinetic_energy + potential_energy


"""
Define Hamilton's equations of motion for GPU usage
"""
@cuda.jit(device=True, inline=True)
def double_pendulum_ode_fixed(t, Y, dYdt, params):

    # --- Unpack parameters ---
    a, s = params # Expects params to be a tuple e.g., (a_value, s_value)

    # --- Get current state variables ---
    theta1 = Y[0]
    theta2 = Y[1]
    p1     = Y[2]
    p2     = Y[3]
    # LD_var = Y[4] # Only needed if its value affects derivatives

    # --- Calculate intermediate terms ---
    delta_theta = theta1 - theta2
    cos_delta = math.cos(delta_theta)
    sin_delta = math.sin(delta_theta)

    beta = a**2 * (1 + s)
    mu   = 1.0 / (1 + s)
    B1 = beta * (1 - mu * cos_delta * cos_delta)

    # --- Angle derivatves ---
    dtheta1dt = (p1 - a * p2 * cos_delta) / B1
    dtheta2dt = (beta * p2 - a * p1 * cos_delta) / B1

    denom = beta - a**2 * cos_delta * cos_delta
    C1 = sin_delta / (2 * denom * denom)

    # --- Calculate derivatives for momenta ---
    dp1dt = -2 * a * C1 * (beta * p2 - a * p1 * cos_delta) * (p1 - a * p2 * cos_delta) - (beta / a) * math.sin(theta1)
    dp2dt =  2 * a * C1 * (beta * p2 - a * p1 * cos_delta) * (p1 - a * p2 * cos_delta) - math.sin(theta2)
    

    # --- Calculate derivative for LD variable ---
    dLDdt = math.sqrt(abs(dtheta1dt)) + math.sqrt(abs(dtheta2dt)) + math.sqrt(abs(dp1dt)) + math.sqrt(abs(dp2dt))

    # --- Store derivatives in output array ---
    dYdt[0] = dtheta1dt
    dYdt[1] = dtheta2dt
    dYdt[2] = dp1dt
    dYdt[3] = dp2dt
    dYdt[4] = dLDdt

In order to obtain the initial conditions that will be simualted, we follow a Monte Carlo approach taking into account that our Poincaré section is defined as:

$$
    \Sigma\left(\mathcal{H}_0\right) = \left\lbrace (\theta_1,\theta_2,p_1,p_2) \in \mathbb{R}^4 \; \Big| \; \mathcal{H} = \mathcal{H}_0 \;,\; \theta_2 = 0 \;,\;  (1 + \sigma) \, p_2 - \dfrac{\cos\theta_1}{\alpha} p_1 > 0 \right\rbrace
$$

Then, we generate a random pair of values for $(\theta_1, p_1)$ and calculate $p_2$ solving the following equation:

$$
    \dfrac{1 + \sigma}{1 + \sigma - \cos^2 \theta_1} \; p_2^2 - \dfrac{2p_1\cos\theta_1}{\alpha (1 + \sigma - \cos^2 \theta_1)} \; p_2 + \dfrac{p_1^2}{\alpha^2 (1 + \sigma - \cos^2 \theta_1)} + 2\left( \mathcal{V}(\theta_1, 0) - \mathcal{H}_0 \right) = 0
$$

and in order to select the correct $p_2$ value (the equation will yield two possible solutions), we take the one that satifies the condition:

$$
    (1 + \sigma) \, p_2 - \dfrac{\cos\theta_1}{\alpha} p_1 > 0
$$

In [3]:
"""
Function to calculate the p2 momentum
"""
def P2_calc(Theta1, Theta2, P1, E, a, s):

    P2 = np.zeros(Theta1.size)
    n_ics = Theta1.size
    for i in range(n_ics):
        v_energy = V(Theta1[i], Theta2[i], a, s)
        aux_term = 2.0 * (v_energy - E)
        A = ((1 + s) / (1 + s - (np.cos(Theta1[i]))**2))
        B = ((2 * P1[i] * np.cos(Theta1[i])) / (a * (1 + s - (np.cos(Theta1[i]))**2)))
        C = (P1[i]**2 / (a**2 * (1 + s - (np.cos(Theta1[i]))**2))) + aux_term
        discriminant = B**2 - 4 * A * C
        if discriminant >= 0:
            p2_plus = (B + np.sqrt(discriminant)) / (2 * A)
            p2_minus = (B - np.sqrt(discriminant)) / (2 * A)
            aux1 = (1 + s) * p2_plus - (np.cos(Theta1[i]) / a) * P1[i]
            aux2 = (1 + s) * p2_minus - (np.cos(Theta1[i]) / a) * P1[i]
            if aux1 > 0:
                P2[i] = p2_plus
            elif aux2 > 0:
                P2[i] = p2_minus
            else:
                P2[i] = np.nan
        else:
            P2[i] = np.nan
    return P2

"""
Function to calculate the limiting value of p1 in the energy surface
"""
def p1_limit(E, a, s, Theta2_Poincare):

    v_energy = V(0, Theta2_Poincare, a, s)
    aux_term = 2.0 * (v_energy - E)
    beta = a**2 * (1 + s)
    mu = 1 / (1 + s)
    
    fix_value = Theta2_Poincare
    numerator = aux_term * beta * (1 - mu * np.cos(fix_value)**2)
    denominator = (a**2 * np.cos(fix_value)**2) / beta - 1
    
    p1_plus = np.sqrt(numerator / denominator)
    p1_minus = -np.sqrt(numerator / denominator)
    
    return np.array([p1_plus, p1_minus])

"""
Function to generate random initial conditions in the theta1-p1 plane
"""
def random_ics(num_ics, a, s, E, Theta2_Poincare):

    np.random.seed(42)
    p1_lims = p1_limit(E, a, s, Theta2_Poincare)
    ics = np.zeros((num_ics, 5))
    i = 0
    while i < num_ics:
        theta_1 = np.random.rand() * np.pi
        p_1 = (2 * p1_lims[0] * np.random.rand() - p1_lims[0])
        p_2 = P2_calc(np.array([theta_1]), np.array([Theta2_Poincare]), 
                      np.array([p_1]), E, a, s)[0]
        if not np.isnan(p_2):
            ics[i, :] = np.array([theta_1, Theta2_Poincare, p_1, p_2, 0.0])
            i += 1 
    return ics

In [4]:
"""
Function to integrate trajectories on the GPU
"""
def integrate_trajectories_adaptive(ics, solver_kernel, params_tuple, num_vars,
                                     t_final, tol, dt_initial, max_steps):
    """
    Integrates multiple trajectories in parallel on the GPU using a provided
    pre-compiled *adaptive-step* solver kernel.

    Args:
        ics (np.ndarray): A 2D NumPy array of initial conditions, shape (n_ics, num_vars).
        solver_kernel: The Numba CUDA kernel function for the *adaptive-step* solver,
                       obtained by calling create_solver_kernel_adaptive(ode_func, num_vars).
        params_tuple (tuple): A tuple containing the parameters required by the
                              ODE function (which the solver_kernel will pass to it).
        num_vars (int): The number of variables in the ODE system (must match ics.shape[1]).
        t_final (float): The target final time for integration.
        tol (float): The absolute error tolerance for adaptive step control.
        dt_initial (float): The initial guess for the time step.
        max_steps (int): The maximum number of adaptive steps allowed per trajectory.

    Returns:
        np.ndarray: A 2D NumPy array containing the final states of the trajectories
                    after integration, shape (n_ics, num_vars). Note that the state
                    corresponds to the time of the last accepted step, which might
                    be less than t_final if max_steps was reached.
    """
    n_ics = ics.shape[0] # Number of initial conditions
    if ics.shape[1] != num_vars:
        raise ValueError(f"Number of variables in ics ({ics.shape[1]}) does not match num_vars ({num_vars})")

    print(f"Starting adaptive integration for {n_ics} trajectories up to t={t_final}...")
    start_time = time.time()

    # --- Prepare GPU Data ---
    print("  Transferring initial conditions to GPU...")
    Y0_device = cuda.to_device(ics)

    print(f"  Allocating output array on GPU ({n_ics}x{num_vars})...")
    Y_out_device = cuda.device_array((n_ics, num_vars), dtype=np.float64)

    # --- Configure CUDA Launch ---
    blockdim = 256 # Or choose a suitable block dimension
    griddim = (n_ics + blockdim - 1) // blockdim
    print(f"  CUDA Launch Config: Grid={griddim}, Block={blockdim}")

    # --- Execute Adaptive Kernel ---
    t0 = 0.0 # Initial time
    print(f"  Launching adaptive solver kernel (tol={tol}, dt_initial={dt_initial}, max_steps={max_steps})...")
    # Call the PASSED-IN adaptive solver_kernel with the correct signature
    solver_kernel[griddim, blockdim](
        Y0_device,      # Initial states
        t0,             # Initial time
        t_final,        # Target final time
        params_tuple,   # Parameters tuple for the ODE function
        tol,            # Error tolerance
        dt_initial,     # Initial dt guess
        max_steps,      # Max allowed steps
        Y_out_device    # Output array
    )
    cuda.synchronize() # Wait for the kernel to complete
    kernel_end_time = time.time()
    print(f"  Kernel execution finished in {kernel_end_time - start_time:.4f} seconds.")

    # --- Retrieve Results ---
    print("  Copying results back to host...")
    final_states = Y_out_device.copy_to_host()
    copy_end_time = time.time()
    print(f"  Data copy finished in {copy_end_time - kernel_end_time:.4f} seconds.")

    total_time = time.time() - start_time
    print(f"Total integration function time: {total_time:.6f} seconds.")
    # Note: Time per IC isn't as straightforward as fixed-step due to varying steps per trajectory
    print(f"Average integration time per IC: {total_time / n_ics:.6f} seconds.") # Less meaningful here

    return final_states


In [None]:
"""
Main function
"""

def main():

    # --- Simulation & System Parameters ---
    a = 1.0 # alpha value 
    s = 1.0 # sigma value
    E = 10.0 # energy value
    num_ics = 1000000 # number of initial conditions

    params_tuple = (a, s) # Bundle parameters for the kernel
    NUM_VARIABLES = 5 # theta1, theta2, p1, p2, LD_var

    # --- Integration Parameters ---
    Theta2_Poincare = 0.0 # Poincare section plane for ICs
    t0 = 0.0
    t_final = 1000.0

    # --- Adaptive Integration Parameters ---
    abs_tolerance = 1e-10          # Example tolerance
    dt_initial_guess = 0.01        # Example initial step guess
    max_steps_allowed = 2000000    # Example max steps safeguard

    # --- Chaos Indicator Parameters ---
    PERTURBATION_D = 1e-5 # Perturbation distance for neighbors
    dims_to_perturb = [0, 2] # Indices of ICs to perturb (e.g., theta1, p1)
    num_dims_perturbed = len(dims_to_perturb)
    num_neighbors_per_ic = 2 * num_dims_perturbed

    output = False # Set to True to save results to files
    if output is True:
        # --- Output Configuration ---
        output_dir = "pendulum_results_fixed"
        indicators_dir = os.path.join(output_dir, "Indicators")
        aux_info_dir = os.path.join(output_dir, "Aux_Info")
        ics_dir = os.path.join(output_dir, "ICS")
        # Create directories if they don't exist
        os.makedirs(indicators_dir, exist_ok=True)
        os.makedirs(aux_info_dir, exist_ok=True)
        os.makedirs(ics_dir, exist_ok=True)
        output_prefix = f"dp_fixed_a{a}_s{s}_E{E}_n{num_ics}"

        print("--- Double Pendulum Simulation (Adaptive Step) ---")
        print(f"Parameters: a={a}, s={s}, E={E}")
        print(f"Integration: t_final={t_final}, tol={abs_tolerance}, dt_init={dt_initial_guess}, max_steps={max_steps_allowed}")
        print(f"ICs: num_ics={num_ics}, plane theta2={Theta2_Poincare}")
        print(f"Chaos Params: d={PERTURBATION_D}, perturbing dims={dims_to_perturb}")
        print(f"Output Dir: {output_dir}")

    # --- Generate Initial Conditions ---
    ics = random_ics(num_ics, a, s, E, Theta2_Poincare)
    if ics.shape[0] == 0:
        print("Error: No valid initial conditions generated. Exiting.")
        sys.exit(1)
    num_ics = ics.shape[0] # Update num_ics if fewer were generated

    # --- Create the Fixed-Step Solver Kernel ---
    print("\nCreating adaptive-step solver kernel...")
    adaptive_step_solver = create_solver_kernel_adaptive(double_pendulum_ode_fixed, NUM_VARIABLES)
    print("Adaptive kernel created.")


    # --- Integrate Main Trajectories --- 
    print("\nIntegrating main trajectories (adaptive)...")
    # Use the generalized ADAPTIVE integration function
    final_results = integrate_trajectories_adaptive( # Changed function call
        ics=ics,
        solver_kernel=adaptive_step_solver, # Pass the adaptive kernel
        params_tuple=params_tuple,
        num_vars=NUM_VARIABLES,
        t_final=t_final,                 # Pass adaptive parameters
        tol=abs_tolerance,
        dt_initial=dt_initial_guess,
        max_steps=max_steps_allowed
    )
    print("Main integration finished.")

    # Extract LD results (assuming LD is the last variable)
    LD_main_results = final_results[:, -1]

    # --- Energy Conservation Check ---
    print("\nChecking energy conservation...")
    initial_energies = np.array([H_cpu(ic[0], ic[1], ic[2], ic[3], a, s) for ic in ics])
    final_energies = np.array([H_cpu(state[0], state[1], state[2], state[3], a, s) for state in final_results])
    energy_diff = np.abs(final_energies - initial_energies)
    max_energy_diff = np.max(energy_diff) if energy_diff.size > 0 else np.nan
    print(f"  Max absolute energy difference: {max_energy_diff:.2e}")

    """
    # --- Generate Neighbor Initial Conditions ---
    print("\nGenerating neighbor initial conditions...")
    all_neighbors_h = []
    for i in range(num_ics):
        # Use the generalized neighbor function with specified d
        neighbors_for_ic = neigh_gen(ics[i], dims_to_perturb, d=PERTURBATION_D)
        all_neighbors_h.append(neighbors_for_ic)
    # Stack into a single array for batch processing
    neighbors_flat_h = np.vstack(all_neighbors_h)
    print(f"Generated {neighbors_flat_h.shape[0]} neighbor ICs.")

    # --- Integrate Neighbor Trajectories --- 
    print("\nIntegrating neighbor trajectories (adaptive)...")
    # Use the generalized ADAPTIVE integration function
    neighbor_results = integrate_trajectories_adaptive( # Changed function call
        ics=neighbors_flat_h,
        solver_kernel=adaptive_step_solver, # Pass the adaptive kernel
        params_tuple=params_tuple,
        num_vars=NUM_VARIABLES,
        t_final=t_final,                 # Pass adaptive parameters
        tol=abs_tolerance,
        dt_initial=dt_initial_guess,
        max_steps=max_steps_allowed
    )
    print("Neighbor integration finished.")

    # Extract and reshape neighbor LD results
    LD_neighbors_flat = neighbor_results[:, -1] # Get last column (LD)
    # Reshape based on the number of neighbors generated per IC
    LD_neighbors_grouped = LD_neighbors_flat.reshape(-1, num_neighbors_per_ic)

    # --- Compute Chaos Indicators ---
    print("\nCalculating chaos indicators...")
    D = np.zeros(num_ics)
    R = np.zeros(num_ics)
    C = np.zeros(num_ics)
    S = np.zeros(num_ics)
    calculation_errors = 0
    for i in range(num_ics):
        L_center = LD_main_results[i]
        L_neighs = LD_neighbors_grouped[i]
        # Use the generalized chaos indicator function with specified d
        D[i], R[i], C[i], S[i] = chaos_indicators(L_center, L_neighs, d=PERTURBATION_D)
    print("Chaos indicators calculation finished.")

    if output is True:
        # --- Save Results --- # Changed section
        print("\nSaving results...")
        output_file_indicators = os.path.join(indicators_dir, f"{output_prefix}_indicators.dat")
        output_file_aux = os.path.join(aux_info_dir, f"{output_prefix}_aux.dat")
        output_file_ics = os.path.join(ics_dir, f"{output_prefix}_ics.dat")

        # Save Chaos Indicators (D, R, C, S)
        results_indicators = np.column_stack((D, R, C, S))
        np.savetxt(output_file_indicators, results_indicators, delimiter=',', header='D,R,C,S', comments='')
        print(f"  Saved indicators to: {output_file_indicators}")

        # Save Auxiliary Info - Modified parameters and header
        results_aux = np.array([[max_energy_diff, np.nan, np.nan, num_ics, abs_tolerance, dt_initial_guess, max_steps_allowed, PERTURBATION_D]])
        np.savetxt(output_file_aux, results_aux, delimiter=',',
                   header='max_energy_diff,time_per_ic(NaN),total_time(NaN),num_ics,tolerance,dt_initial,max_steps,perturbation_d', # Changed header
                   comments='')
        print(f"  Saved auxiliary info to: {output_file_aux}")

        # Save Main Initial Conditions
        np.savetxt(output_file_ics, ics, delimiter=',')
        print(f"  Saved initial conditions to: {output_file_ics}")
    """
    print("\n--- Simulation Complete ---")

"""
Call main function
"""
if __name__ == "__main__":
    # Check for CUDA availability
    if not cuda.is_available():
        print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print("!!! CUDA is not available or not detected! !!!")
        print("!!! This script requires a CUDA-enabled GPU!!!")
        print("!!! and correctly installed CUDA drivers   !!!")
        print("!!! and Numba.                           !!!")
        print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        sys.exit(1) # Exit if no CUDA
    else:
        print(f"Found CUDA device: {cuda.get_current_device().name.decode()}")
        main() # Run the main function