Inspired By JohBrandstetter rep

https://github.com/brandstetter-johannes/LPSDA/tree/master



The Kuramoto-Sivashinsky (KS) equation is given by:

$
u_t + u_{xx} + u_{xxxx} + uu_x = 0,
$

where

$ u = u(x,t) $ is a function of space x and time t , $u_t$ is the partial derivative with respect to time, $ u_{xx}$ and $u_{xxxx}$ are the second and fourth partial derivatives with respect to space, and $uu_x$ represents the nonlinear advection term.



We can solve this equation by using the **pseudospectral method**, the derivatives are computed in the frequency domain by applying a **FFT** , mutliplying by the appropriate values and converting back to the spatia domain with **the inverse FFT**



In [1]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from typing import Optional
from scipy.integrate import solve_ivp
from scipy.fftpack import diff as psdiff

import os

In [7]:
from ssl import ALERT_DESCRIPTION_ILLEGAL_PARAMETER
import numpy as np
from scipy.integrate import solve_ivp
import random
import os
from typing import Tuple, Optional


def generate_coeffs(alpha: Optional[float] = None, beta: Optional[float] = None, gamma: Optional[float] = None) -> np.ndarray:
    """
    Generate or return provided coefficients.
    """
    if alpha is None:
        alpha = np.random.uniform(0.5, 1.5)  # coefficient for u_xxxx
    if beta is None:
        beta = np.random.uniform(0.5, 1.5)    # coefficient for u_xx
    if gamma is None:
        gamma = np.random.uniform(0.5, 1.5) # coefficient for uu_x

    return np.array([alpha, beta, gamma])

def generate_initial_params() -> Tuple[int, np.ndarray, np.ndarray, np.ndarray]:
    """
    Returns parameters for initial conditions.
    """
    N = 10
    lmin, lmax = 1, 3
    A = (np.random.rand(1, N) - 0.5)
    phi = 2.0 * np.pi * np.random.rand(1, N)
    l = np.random.randint(lmin, lmax, (1, N))
    return N, A, phi, l

def initial_conditions(x: np.ndarray, L: int, params: Optional[list] = None) -> np.ndarray:
    """
    Return initial conditions based on initial parameters.
    """
    if params is None:
        params = generate_initial_params()
    N, A, phi, l = params
    u = np.sum(A * np.sin((2 * np.pi * l * x[:, None] / L) + phi), -1)
    return u


def pseudospectral(t: float, u: np.ndarray, L: float, coeffs: np.ndarray) -> np.ndarray:
    """
    Compute spatial derivatives for KS equation using Fourier transform.
    """
    alpha, beta, gamma = coeffs
    u_x = psdiff(u, period=L)
    u_xx = psdiff(u, period=L, order=2)
    u_xxxx = psdiff(u, period=L, order=4)
    uu_x = u * u_x

    dudt = -gamma * uu_x - beta * u_xx - alpha * u_xxxx
    return dudt

def simulate_ks(L: int, N: int, T: float, dt: float, initial_condition: np.ndarray, coeffs: np.ndarray) -> np.ndarray:
    x = np.linspace(0, L, N)
    t = np.arange(0, T + dt, dt)
    print(f"Simulating KS equation for T={T}, dt={dt}, coeffs={coeffs}")
    sol = solve_ivp(fun=pseudospectral, t_span=[t[0], t[-1]], y0=initial_condition, method='Radau', t_eval=t, args=(L, coeffs))
    print(f"Simulation completed for T={T}")
    return sol.y.T  # Transpose to have shape (time, space)

def generate_input_output_pairs(solution: np.ndarray, delta_t: int, coeffs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    inputs = []
    outputs = []
    num_time_steps = solution.shape[0]
    spatial_grid = np.linspace(0, L, solution.shape[1])

    for t in range(num_time_steps - delta_t):

        input_with_grid = np.stack((solution[t], spatial_grid), axis=-1)
        inputs.append(input_with_grid)
        outputs.append(solution[t + delta_t])


    return np.array(inputs), np.array(outputs)

def aggregate_datasets(num_initial_conditions, T, dt, L, N, output_dir, dataset_type, alpha=None, beta=None, gamma=None):
    all_inputs = []
    all_outputs = []

    # Generate coefficients once for the entire dataset
    print(f"Using coefficients: alpha={coeffs[0]}, beta={coeffs[1]}, gamma={coeffs[2]}")
    print(f"Generating {dataset_type} dataset with coefficients: alpha={coeffs[0]}, beta={coeffs[1]}, gamma={coeffs[2]}")

    for i in range(num_initial_conditions):
        print(f"Generating {dataset_type} dataset for initial condition {i + 1}/{num_initial_conditions}...")
        initial_params = generate_initial_params()
        initial_condition = initial_conditions(np.linspace(0, L, N), L, initial_params)
        solution = simulate_ks(L, N, T, dt, initial_condition, coeffs)
        inputs, outputs = generate_input_output_pairs(solution, delta_t, coeffs)
        all_inputs.append(inputs)
        all_outputs.append(outputs)

    all_inputs = np.concatenate(all_inputs, axis=0)
    all_outputs = np.concatenate(all_outputs, axis=0)

    save_dataset(all_inputs, os.path.join(output_dir, f'{dataset_type}_inputs_ex17.npy'))
    save_dataset(all_outputs, os.path.join(output_dir, f'{dataset_type}_outputs_ex17.npy'))

def sample_uniformly(inputs: np.ndarray, outputs: np.ndarray, num_samples: int) -> Tuple[np.ndarray, np.ndarray]:
    total_samples = inputs.shape[0]
    sampled_indices = random.sample(range(total_samples), num_samples)
    sampled_inputs = inputs[sampled_indices]
    sampled_outputs = outputs[sampled_indices]
    return sampled_inputs, sampled_outputs

def save_dataset(data: np.ndarray, filename: str):
    np.save(filename, data)
    print(f"Saved dataset to {filename}")


# Parameters
L = 100
N = 256
T_train = 3000
T_val_test = 1000
dt = 0.05  # time step for integration
delta_t = 1  # time step for input-output pairs
output_dir = './ks_data'
os.makedirs(output_dir, exist_ok=True)
num_initial_conditions_train = 5
num_initial_conditions_val = 2
num_initial_conditions_test = 1

# Example usage:
# To use random coefficients:
coeffs = [1.0977216  ,1.43569084 ,1.33350611]
print(coeffs)
#aggregate_datasets(num_initial_conditions_train, T_train, dt, L, N, output_dir, 'train',alpha=coeffs[0],beta=coeffs[1],gamma=coeffs[2])
aggregate_datasets(num_initial_conditions_val, T_val_test, dt, L, N, output_dir, 'val',alpha=coeffs[0],beta=coeffs[1],gamma=coeffs[2])

# To use specific coefficients:
#aggregate_datasets(num_initial_conditions_train, T_train, dt, L, N, output_dir, 'train', alpha=1.0, beta=-1.5, gamma=1.0)

[1.0977216, 1.43569084, 1.33350611]
Using coefficients: alpha=1.0977216, beta=1.43569084, gamma=1.33350611
Generating val dataset with coefficients: alpha=1.0977216, beta=1.43569084, gamma=1.33350611
Generating val dataset for initial condition 1/2...
Simulating KS equation for T=1000, dt=0.05, coeffs=[1.0977216, 1.43569084, 1.33350611]
Simulation completed for T=1000
Generating val dataset for initial condition 2/2...
Simulating KS equation for T=1000, dt=0.05, coeffs=[1.0977216, 1.43569084, 1.33350611]
Simulation completed for T=1000
Saved dataset to ./ks_data/val_inputs_ex17.npy
Saved dataset to ./ks_data/val_outputs_ex17.npy


In [None]:
def generate_input_output_pairs(solution: np.ndarray, input_steps: int, output_steps: int, coeffs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    inputs = []
    outputs = []
    num_time_steps = solution.shape[0]
    spatial_grid = np.linspace(0, L, solution.shape[1])

    for t in range(input_steps - 1, num_time_steps - output_steps):
        # Input: [t-2, t-1, t]
        input_sequence = solution[t-input_steps+1:t+1]
        input_with_grid = np.stack([*input_sequence, spatial_grid], axis=-1)
        inputs.append(input_with_grid)

        # Output: [t+1, t+2, ..., t+20]
        output_sequence = solution[t+1:t+1+output_steps]
        outputs.append(output_sequence)

    return np.array(inputs), np.array(outputs)

def aggregate_datasets(num_initial_conditions, T, dt, L, N, output_dir, dataset_type, input_steps, output_steps, alpha=None, beta=None, gamma=None):
    all_inputs = []
    all_outputs = []

    coeffs = generate_coeffs(alpha, beta, gamma)
    print(f"Generating {dataset_type} dataset with coefficients: alpha={coeffs[0]}, beta={coeffs[1]}, gamma={coeffs[2]}")

    for i in range(num_initial_conditions):
        print(f"Generating {dataset_type} dataset for initial condition {i + 1}/{num_initial_conditions}...")
        initial_params = generate_initial_params()
        initial_condition = initial_conditions(np.linspace(0, L, N), L, initial_params)
        solution = simulate_ks(L, N, T, dt, initial_condition, coeffs)
        inputs, outputs = generate_input_output_pairs(solution, input_steps, output_steps, coeffs)
        all_inputs.append(inputs)
        all_outputs.append(outputs)

    all_inputs = np.concatenate(all_inputs, axis=0)
    all_outputs = np.concatenate(all_outputs, axis=0)

    save_dataset(all_inputs, os.path.join(output_dir, f'{dataset_type}_inputs_ex1_multi.npy'))
    save_dataset(all_outputs, os.path.join(output_dir, f'{dataset_type}_outputs_ex1_multi.npy'))

# Parameters
L = 100
N = 256
T_train = 2000
T_val_test = 1500
dt = 0.1  # time step for integration
input_steps = 3  # [t-2, t-1, t]
output_steps = 20  # [t+1, t+2, ..., t+20]
output_dir = './ks_data'
os.makedirs(output_dir, exist_ok=True)
num_initial_conditions_train = 5
num_initial_conditions_val = 3
num_initial_conditions_test = 1

# Example usage:
coeffs = [1,1,1]
print(coeffs)
#aggregate_datasets(num_initial_conditions_train, T_train, dt, L, N, output_dir, 'train', input_steps, output_steps, alpha=coeffs[0], beta=coeffs[1], gamma=coeffs[2])
aggregate_datasets(num_initial_conditions_test, T_val_test, dt, L, N, output_dir, 'val',input_steps,output_steps,alpha=coeffs[0],beta=coeffs[1],gamma=coeffs[2])

[1, 1, 1]
Generating val dataset with coefficients: alpha=1, beta=1, gamma=1
Generating val dataset for initial condition 1/1...
Simulating KS equation for T=1500, dt=0.1, coeffs=[1 1 1]
Simulation completed for T=1500
Saved dataset to ./ks_data/val_inputs_ex1_multi.npy
Saved dataset to ./ks_data/val_outputs_ex1_multi.npy


In [8]:
coeffs = [0.82524363, 0.79659697, 0.9230731 ]
def generate_dataset_test(num_samples: int, L: int, N: int, T: float, num_time_steps: int, tol: float, output_dir: str):
    x = np.linspace(0,  L, N)
    t = np.arange(0, T+0.05, 0.05)

    initial_conditions_list = []
    solutions_list = []

    for i in range(num_samples):
        initial_params = generate_initial_params()
        u0 = initial_conditions(x, L, initial_params)
        sol_ps = solve_ivp(fun=pseudospectral, t_span=[t[0], t[-1]], y0=u0, method='Radau', t_eval=t, args=(L,coeffs), atol=tol, rtol=tol)
        initial_conditions_list.append(u0)
        solutions_list.append(sol_ps.y.T)
        if i % 10 == 0:
            print(f"Generated {i} samples")

    initial_conditions_array = np.array(initial_conditions_list)
    max_len = max([sol.shape[0] for sol in solutions_list])
    solutions_array = np.zeros((num_samples, max_len, N))
    for i, sol in enumerate(solutions_list):
        solutions_array[i, :sol.shape[0], :] = sol

    np.save(os.path.join(output_dir, 'test_inputs_ex15.npy'), initial_conditions_array)
    np.save(os.path.join(output_dir, 'test_outputs_ex15.npy'), solutions_array)

    return initial_conditions_array, solutions_array

L = 100
N = 256
T = 100
num_time_steps = 200
tol = 1e-6
num_samples = 1200
output_dir = './ks_data'

test_initial_conditions, test_solutions = generate_dataset_test(1, L, N, T, num_time_steps, tol, output_dir)






Generated 0 samples
